capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

rpc.c++ (131328B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #include "rpc.h"
     23 #include "message.h"
     24 #include <kj/debug.h>
     25 #include <kj/vector.h>
     26 #include <kj/async.h>
     27 #include <kj/one-of.h>
     28 #include <kj/function.h>
     29 #include <functional>  // std::greater
     30 #include <unordered_map>
     31 #include <map>
     32 #include <queue>
     33 #include <capnp/rpc.capnp.h>
     34 #include <kj/io.h>
     35 #include <kj/map.h>
     36 
     37 namespace capnp {
     38 namespace _ {  // private
     39 
     40 namespace {
     41 
     42 template <typename T>
     43 inline constexpr uint messageSizeHint() {
     44   return 1 + sizeInWords<rpc::Message>() + sizeInWords<T>();
     45 }
     46 template <>
     47 inline constexpr uint messageSizeHint<void>() {
     48   return 1 + sizeInWords<rpc::Message>();
     49 }
     50 
     51 constexpr const uint MESSAGE_TARGET_SIZE_HINT = sizeInWords<rpc::MessageTarget>() +
     52     sizeInWords<rpc::PromisedAnswer>() + 16;  // +16 for ops; hope that's enough
     53 
     54 constexpr const uint CAP_DESCRIPTOR_SIZE_HINT = sizeInWords<rpc::CapDescriptor>() +
     55     sizeInWords<rpc::PromisedAnswer>();
     56 
     57 constexpr const uint64_t MAX_SIZE_HINT = 1 << 20;
     58 
     59 uint copySizeHint(MessageSize size) {
     60   uint64_t sizeHint = size.wordCount + size.capCount * CAP_DESCRIPTOR_SIZE_HINT
     61                     // if capCount > 0, the cap descriptor list has a 1-word tag
     62                     + (size.capCount > 0);
     63   return kj::min(MAX_SIZE_HINT, sizeHint);
     64 }
     65 
     66 uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint, uint additional) {
     67   KJ_IF_MAYBE(s, sizeHint) {
     68     return copySizeHint(*s) + additional;
     69   } else {
     70     return 0;
     71   }
     72 }
     73 
     74 kj::Maybe<kj::Array<PipelineOp>> toPipelineOps(List<rpc::PromisedAnswer::Op>::Reader ops) {
     75   auto result = kj::heapArrayBuilder<PipelineOp>(ops.size());
     76   for (auto opReader: ops) {
     77     PipelineOp op;
     78     switch (opReader.which()) {
     79       case rpc::PromisedAnswer::Op::NOOP:
     80         op.type = PipelineOp::NOOP;
     81         break;
     82       case rpc::PromisedAnswer::Op::GET_POINTER_FIELD:
     83         op.type = PipelineOp::GET_POINTER_FIELD;
     84         op.pointerIndex = opReader.getGetPointerField();
     85         break;
     86       default:
     87         KJ_FAIL_REQUIRE("Unsupported pipeline op.", (uint)opReader.which()) {
     88           return nullptr;
     89         }
     90     }
     91     result.add(op);
     92   }
     93   return result.finish();
     94 }
     95 
     96 Orphan<List<rpc::PromisedAnswer::Op>> fromPipelineOps(
     97     Orphanage orphanage, kj::ArrayPtr<const PipelineOp> ops) {
     98   auto result = orphanage.newOrphan<List<rpc::PromisedAnswer::Op>>(ops.size());
     99   auto builder = result.get();
    100   for (uint i: kj::indices(ops)) {
    101     rpc::PromisedAnswer::Op::Builder opBuilder = builder[i];
    102     switch (ops[i].type) {
    103       case PipelineOp::NOOP:
    104         opBuilder.setNoop();
    105         break;
    106       case PipelineOp::GET_POINTER_FIELD:
    107         opBuilder.setGetPointerField(ops[i].pointerIndex);
    108         break;
    109     }
    110   }
    111   return result;
    112 }
    113 
    114 kj::Exception toException(const rpc::Exception::Reader& exception) {
    115   auto reason = [&]() {
    116     if (exception.getReason().startsWith("remote exception: ")) {
    117       return kj::str(exception.getReason());
    118     } else {
    119       return kj::str("remote exception: ", exception.getReason());
    120     }
    121   }();
    122 
    123   kj::Exception result(static_cast<kj::Exception::Type>(exception.getType()),
    124       "(remote)", 0, kj::mv(reason));
    125   if (exception.hasTrace()) {
    126     result.setRemoteTrace(kj::str(exception.getTrace()));
    127   }
    128   return result;
    129 }
    130 
    131 void fromException(const kj::Exception& exception, rpc::Exception::Builder builder,
    132                    kj::Maybe<kj::Function<kj::String(const kj::Exception&)>&> traceEncoder) {
    133   kj::StringPtr description = exception.getDescription();
    134 
    135   // Include context, if any.
    136   kj::Vector<kj::String> contextLines;
    137   for (auto context = exception.getContext();;) {
    138     KJ_IF_MAYBE(c, context) {
    139       contextLines.add(kj::str("context: ", c->file, ": ", c->line, ": ", c->description));
    140       context = c->next;
    141     } else {
    142       break;
    143     }
    144   }
    145   kj::String scratch;
    146   if (contextLines.size() > 0) {
    147     scratch = kj::str(description, '\n', kj::strArray(contextLines, "\n"));
    148     description = scratch;
    149   }
    150 
    151   builder.setReason(description);
    152   builder.setType(static_cast<rpc::Exception::Type>(exception.getType()));
    153 
    154   KJ_IF_MAYBE(t, traceEncoder) {
    155     builder.setTrace((*t)(exception));
    156   }
    157 
    158   if (exception.getType() == kj::Exception::Type::FAILED &&
    159       !exception.getDescription().startsWith("remote exception:")) {
    160     KJ_LOG(INFO, "returning failure over rpc", exception);
    161   }
    162 }
    163 
    164 uint exceptionSizeHint(const kj::Exception& exception) {
    165   return sizeInWords<rpc::Exception>() + exception.getDescription().size() / sizeof(word) + 1;
    166 }
    167 
    168 // =======================================================================================
    169 
    170 template <typename Id, typename T>
    171 class ExportTable {
    172   // Table mapping integers to T, where the integers are chosen locally.
    173 
    174 public:
    175   kj::Maybe<T&> find(Id id) {
    176     if (id < slots.size() && slots[id] != nullptr) {
    177       return slots[id];
    178     } else {
    179       return nullptr;
    180     }
    181   }
    182 
    183   T erase(Id id, T& entry) {
    184     // Remove an entry from the table and return it.  We return it so that the caller can be
    185     // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
    186     // `entry` is a reference to the entry being released -- we require this in order to prove
    187     // that the caller has already done a find() to check that this entry exists.  We can't check
    188     // ourselves because the caller may have nullified the entry in the meantime.
    189     KJ_DREQUIRE(&entry == &slots[id]);
    190     T toRelease = kj::mv(slots[id]);
    191     slots[id] = T();
    192     freeIds.push(id);
    193     return toRelease;
    194   }
    195 
    196   T& next(Id& id) {
    197     if (freeIds.empty()) {
    198       id = slots.size();
    199       return slots.add();
    200     } else {
    201       id = freeIds.top();
    202       freeIds.pop();
    203       return slots[id];
    204     }
    205   }
    206 
    207   template <typename Func>
    208   void forEach(Func&& func) {
    209     for (Id i = 0; i < slots.size(); i++) {
    210       if (slots[i] != nullptr) {
    211         func(i, slots[i]);
    212       }
    213     }
    214   }
    215 
    216 private:
    217   kj::Vector<T> slots;
    218   std::priority_queue<Id, std::vector<Id>, std::greater<Id>> freeIds;
    219 };
    220 
    221 template <typename Id, typename T>
    222 class ImportTable {
    223   // Table mapping integers to T, where the integers are chosen remotely.
    224 
    225 public:
    226   T& operator[](Id id) {
    227     if (id < kj::size(low)) {
    228       return low[id];
    229     } else {
    230       return high[id];
    231     }
    232   }
    233 
    234   kj::Maybe<T&> find(Id id) {
    235     if (id < kj::size(low)) {
    236       return low[id];
    237     } else {
    238       auto iter = high.find(id);
    239       if (iter == high.end()) {
    240         return nullptr;
    241       } else {
    242         return iter->second;
    243       }
    244     }
    245   }
    246 
    247   T erase(Id id) {
    248     // Remove an entry from the table and return it.  We return it so that the caller can be
    249     // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
    250     if (id < kj::size(low)) {
    251       T toRelease = kj::mv(low[id]);
    252       low[id] = T();
    253       return toRelease;
    254     } else {
    255       T toRelease = kj::mv(high[id]);
    256       high.erase(id);
    257       return toRelease;
    258     }
    259   }
    260 
    261   template <typename Func>
    262   void forEach(Func&& func) {
    263     for (Id i: kj::indices(low)) {
    264       func(i, low[i]);
    265     }
    266     for (auto& entry: high) {
    267       func(entry.first, entry.second);
    268     }
    269   }
    270 
    271 private:
    272   T low[16];
    273   std::unordered_map<Id, T> high;
    274 };
    275 
    276 // =======================================================================================
    277 
    278 class RpcConnectionState final: public kj::TaskSet::ErrorHandler, public kj::Refcounted {
    279 public:
    280   struct DisconnectInfo {
    281     kj::Promise<void> shutdownPromise;
    282     // Task which is working on sending an abort message and cleanly ending the connection.
    283   };
    284 
    285   RpcConnectionState(BootstrapFactoryBase& bootstrapFactory,
    286                      kj::Maybe<SturdyRefRestorerBase&> restorer,
    287                      kj::Own<VatNetworkBase::Connection>&& connectionParam,
    288                      kj::Own<kj::PromiseFulfiller<DisconnectInfo>>&& disconnectFulfiller,
    289                      size_t flowLimit,
    290                      kj::Maybe<kj::Function<kj::String(const kj::Exception&)>&> traceEncoder)
    291       : bootstrapFactory(bootstrapFactory),
    292         restorer(restorer), disconnectFulfiller(kj::mv(disconnectFulfiller)), flowLimit(flowLimit),
    293         traceEncoder(traceEncoder), tasks(*this) {
    294     connection.init<Connected>(kj::mv(connectionParam));
    295     tasks.add(messageLoop());
    296   }
    297 
    298   kj::Own<ClientHook> restore(AnyPointer::Reader objectId) {
    299     if (connection.is<Disconnected>()) {
    300       return newBrokenCap(kj::cp(connection.get<Disconnected>()));
    301     }
    302 
    303     QuestionId questionId;
    304     auto& question = questions.next(questionId);
    305 
    306     question.isAwaitingReturn = true;
    307 
    308     auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
    309 
    310     auto questionRef = kj::refcounted<QuestionRef>(*this, questionId, kj::mv(paf.fulfiller));
    311     question.selfRef = *questionRef;
    312 
    313     paf.promise = paf.promise.attach(kj::addRef(*questionRef));
    314 
    315     {
    316       auto message = connection.get<Connected>()->newOutgoingMessage(
    317           objectId.targetSize().wordCount + messageSizeHint<rpc::Bootstrap>());
    318 
    319       auto builder = message->getBody().initAs<rpc::Message>().initBootstrap();
    320       builder.setQuestionId(questionId);
    321       builder.getDeprecatedObjectId().set(objectId);
    322 
    323       message->send();
    324     }
    325 
    326     auto pipeline = kj::refcounted<RpcPipeline>(*this, kj::mv(questionRef), kj::mv(paf.promise));
    327 
    328     return pipeline->getPipelinedCap(kj::Array<const PipelineOp>(nullptr));
    329   }
    330 
    331   void taskFailed(kj::Exception&& exception) override {
    332     disconnect(kj::mv(exception));
    333   }
    334 
    335   void disconnect(kj::Exception&& exception) {
    336     // After disconnect(), the RpcSystem could be destroyed, making `traceEncoder` a dangling
    337     // reference, so null it out before we return from here. We don't need it anymore once
    338     // disconnected anyway.
    339     KJ_DEFER(traceEncoder = nullptr);
    340 
    341     if (!connection.is<Connected>()) {
    342       // Already disconnected.
    343       return;
    344     }
    345 
    346     kj::Exception networkException(kj::Exception::Type::DISCONNECTED,
    347         exception.getFile(), exception.getLine(), kj::heapString(exception.getDescription()));
    348 
    349     // Don't throw away the stack trace.
    350     if (exception.getRemoteTrace() != nullptr) {
    351       networkException.setRemoteTrace(kj::str(exception.getRemoteTrace()));
    352     }
    353     for (void* addr: exception.getStackTrace()) {
    354       networkException.addTrace(addr);
    355     }
    356     // If your stack trace points here, it means that the exception became the reason that the
    357     // RPC connection was disconnected. The exception was then thrown by all in-flight calls and
    358     // all future calls on this connection.
    359     networkException.addTraceHere();
    360 
    361     KJ_IF_MAYBE(newException, kj::runCatchingExceptions([&]() {
    362       // Carefully pull all the objects out of the tables prior to releasing them because their
    363       // destructors could come back and mess with the tables.
    364       kj::Vector<kj::Own<PipelineHook>> pipelinesToRelease;
    365       kj::Vector<kj::Own<ClientHook>> clientsToRelease;
    366       kj::Vector<kj::Promise<kj::Own<RpcResponse>>> tailCallsToRelease;
    367       kj::Vector<kj::Promise<void>> resolveOpsToRelease;
    368 
    369       // All current questions complete with exceptions.
    370       questions.forEach([&](QuestionId id, Question& question) {
    371         KJ_IF_MAYBE(questionRef, question.selfRef) {
    372           // QuestionRef still present.
    373           questionRef->reject(kj::cp(networkException));
    374         }
    375       });
    376 
    377       answers.forEach([&](AnswerId id, Answer& answer) {
    378         KJ_IF_MAYBE(p, answer.pipeline) {
    379           pipelinesToRelease.add(kj::mv(*p));
    380         }
    381 
    382         KJ_IF_MAYBE(promise, answer.redirectedResults) {
    383           tailCallsToRelease.add(kj::mv(*promise));
    384         }
    385 
    386         KJ_IF_MAYBE(context, answer.callContext) {
    387           context->requestCancel();
    388         }
    389       });
    390 
    391       exports.forEach([&](ExportId id, Export& exp) {
    392         clientsToRelease.add(kj::mv(exp.clientHook));
    393         KJ_IF_MAYBE(op, exp.resolveOp) {
    394           resolveOpsToRelease.add(kj::mv(*op));
    395         }
    396         exp = Export();
    397       });
    398 
    399       imports.forEach([&](ImportId id, Import& import) {
    400         KJ_IF_MAYBE(f, import.promiseFulfiller) {
    401           f->get()->reject(kj::cp(networkException));
    402         }
    403       });
    404 
    405       embargoes.forEach([&](EmbargoId id, Embargo& embargo) {
    406         KJ_IF_MAYBE(f, embargo.fulfiller) {
    407           f->get()->reject(kj::cp(networkException));
    408         }
    409       });
    410     })) {
    411       // Some destructor must have thrown an exception.  There is no appropriate place to report
    412       // these errors.
    413       KJ_LOG(ERROR, "Uncaught exception when destroying capabilities dropped by disconnect.",
    414              *newException);
    415     }
    416 
    417     // Send an abort message, but ignore failure.
    418     kj::runCatchingExceptions([&]() {
    419       auto message = connection.get<Connected>()->newOutgoingMessage(
    420           messageSizeHint<void>() + exceptionSizeHint(exception));
    421       fromException(exception, message->getBody().getAs<rpc::Message>().initAbort());
    422       message->send();
    423     });
    424 
    425     // Indicate disconnect.
    426     auto shutdownPromise = connection.get<Connected>()->shutdown()
    427         .attach(kj::mv(connection.get<Connected>()))
    428         .then([]() -> kj::Promise<void> { return kj::READY_NOW; },
    429               [origException = kj::mv(exception)](kj::Exception&& e) -> kj::Promise<void> {
    430           // Don't report disconnects as an error.
    431           if (e.getType() == kj::Exception::Type::DISCONNECTED) {
    432             return kj::READY_NOW;
    433           }
    434           // If the error is just what was passed in to disconnect(), don't report it back out
    435           // since it shouldn't be anything the caller doesn't already know about.
    436           if (e.getType() == origException.getType() &&
    437               e.getDescription() == origException.getDescription()) {
    438             return kj::READY_NOW;
    439           }
    440           return kj::mv(e);
    441         });
    442     disconnectFulfiller->fulfill(DisconnectInfo { kj::mv(shutdownPromise) });
    443     connection.init<Disconnected>(kj::mv(networkException));
    444     canceler.cancel(networkException);
    445   }
    446 
    447   void setFlowLimit(size_t words) {
    448     flowLimit = words;
    449     maybeUnblockFlow();
    450   }
    451 
    452 private:
    453   class RpcClient;
    454   class ImportClient;
    455   class PromiseClient;
    456   class QuestionRef;
    457   class RpcPipeline;
    458   class RpcCallContext;
    459   class RpcResponse;
    460 
    461   // =======================================================================================
    462   // The Four Tables entry types
    463   //
    464   // We have to define these before we can define the class's fields.
    465 
    466   typedef uint32_t QuestionId;
    467   typedef QuestionId AnswerId;
    468   typedef uint32_t ExportId;
    469   typedef ExportId ImportId;
    470   // See equivalent definitions in rpc.capnp.
    471   //
    472   // We always use the type that refers to the local table of the same name.  So e.g. although
    473   // QuestionId and AnswerId are the same type, we use QuestionId when referring to an entry in
    474   // the local question table (which corresponds to the peer's answer table) and use AnswerId
    475   // to refer to an entry in our answer table (which corresponds to the peer's question table).
    476   // Since all messages in the RPC protocol are defined from the sender's point of view, this
    477   // means that any time we read an ID from a received message, its type should invert.
    478   // TODO(cleanup):  Perhaps we could enforce that in a type-safe way?  Hmm...
    479 
    480   struct Question {
    481     kj::Array<ExportId> paramExports;
    482     // List of exports that were sent in the request.  If the response has `releaseParamCaps` these
    483     // will need to be released.
    484 
    485     kj::Maybe<QuestionRef&> selfRef;
    486     // The local QuestionRef, set to nullptr when it is destroyed, which is also when `Finish` is
    487     // sent.
    488 
    489     bool isAwaitingReturn = false;
    490     // True from when `Call` is sent until `Return` is received.
    491 
    492     bool isTailCall = false;
    493     // Is this a tail call?  If so, we don't expect to receive results in the `Return`.
    494 
    495     bool skipFinish = false;
    496     // If true, don't send a Finish message.
    497 
    498     inline bool operator==(decltype(nullptr)) const {
    499       return !isAwaitingReturn && selfRef == nullptr;
    500     }
    501     inline bool operator!=(decltype(nullptr)) const { return !operator==(nullptr); }
    502   };
    503 
    504   struct Answer {
    505     Answer() = default;
    506     Answer(const Answer&) = delete;
    507     Answer(Answer&&) = default;
    508     Answer& operator=(Answer&&) = default;
    509     // If we don't explicitly write all this, we get some stupid error deep in STL.
    510 
    511     bool active = false;
    512     // True from the point when the Call message is received to the point when both the `Finish`
    513     // message has been received and the `Return` has been sent.
    514 
    515     kj::Maybe<kj::Own<PipelineHook>> pipeline;
    516     // Send pipelined calls here.  Becomes null as soon as a `Finish` is received.
    517 
    518     kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> redirectedResults;
    519     // For locally-redirected calls (Call.sendResultsTo.yourself), this is a promise for the call
    520     // result, to be picked up by a subsequent `Return`.
    521 
    522     kj::Maybe<RpcCallContext&> callContext;
    523     // The call context, if it's still active.  Becomes null when the `Return` message is sent.
    524     // This object, if non-null, is owned by `asyncOp`.
    525 
    526     kj::Array<ExportId> resultExports;
    527     // List of exports that were sent in the results.  If the finish has `releaseResultCaps` these
    528     // will need to be released.
    529   };
    530 
    531   struct Export {
    532     uint refcount = 0;
    533     // When this reaches 0, drop `clientHook` and free this export.
    534 
    535     kj::Own<ClientHook> clientHook;
    536 
    537     kj::Maybe<kj::Promise<void>> resolveOp = nullptr;
    538     // If this export is a promise (not a settled capability), the `resolveOp` represents the
    539     // ongoing operation to wait for that promise to resolve and then send a `Resolve` message.
    540 
    541     inline bool operator==(decltype(nullptr)) const { return refcount == 0; }
    542     inline bool operator!=(decltype(nullptr)) const { return refcount != 0; }
    543   };
    544 
    545   struct Import {
    546     Import() = default;
    547     Import(const Import&) = delete;
    548     Import(Import&&) = default;
    549     Import& operator=(Import&&) = default;
    550     // If we don't explicitly write all this, we get some stupid error deep in STL.
    551 
    552     kj::Maybe<ImportClient&> importClient;
    553     // Becomes null when the import is destroyed.
    554 
    555     kj::Maybe<RpcClient&> appClient;
    556     // Either a copy of importClient, or, in the case of promises, the wrapping PromiseClient.
    557     // Becomes null when it is discarded *or* when the import is destroyed (e.g. the promise is
    558     // resolved and the import is no longer needed).
    559 
    560     kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Own<ClientHook>>>> promiseFulfiller;
    561     // If non-null, the import is a promise.
    562   };
    563 
    564   typedef uint32_t EmbargoId;
    565 
    566   struct Embargo {
    567     // For handling the `Disembargo` message when looping back to self.
    568 
    569     kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller;
    570     // Fulfill this when the Disembargo arrives.
    571 
    572     inline bool operator==(decltype(nullptr)) const { return fulfiller == nullptr; }
    573     inline bool operator!=(decltype(nullptr)) const { return fulfiller != nullptr; }
    574   };
    575 
    576   // =======================================================================================
    577   // OK, now we can define RpcConnectionState's member data.
    578 
    579   BootstrapFactoryBase& bootstrapFactory;
    580   kj::Maybe<SturdyRefRestorerBase&> restorer;
    581 
    582   typedef kj::Own<VatNetworkBase::Connection> Connected;
    583   typedef kj::Exception Disconnected;
    584   kj::OneOf<Connected, Disconnected> connection;
    585   // Once the connection has failed, we drop it and replace it with an exception, which will be
    586   // thrown from all further calls.
    587 
    588   kj::Canceler canceler;
    589   // Will be canceled if and when `connection` is changed from `Connected` to `Disconnected`.
    590   // TODO(cleanup): `Connected` should be a struct that contains the connection and the Canceler,
    591   //   but that's more refactoring than I want to do right now.
    592 
    593   kj::Own<kj::PromiseFulfiller<DisconnectInfo>> disconnectFulfiller;
    594 
    595   ExportTable<ExportId, Export> exports;
    596   ExportTable<QuestionId, Question> questions;
    597   ImportTable<AnswerId, Answer> answers;
    598   ImportTable<ImportId, Import> imports;
    599   // The Four Tables!
    600   // The order of the tables is important for correct destruction.
    601 
    602   std::unordered_map<ClientHook*, ExportId> exportsByCap;
    603   // Maps already-exported ClientHook objects to their ID in the export table.
    604 
    605   ExportTable<EmbargoId, Embargo> embargoes;
    606   // There are only four tables.  This definitely isn't a fifth table.  I don't know what you're
    607   // talking about.
    608 
    609   size_t flowLimit;
    610   size_t callWordsInFlight = 0;
    611 
    612   kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> flowWaiter;
    613   // If non-null, we're currently blocking incoming messages waiting for callWordsInFlight to drop
    614   // below flowLimit. Fulfill this to un-block.
    615 
    616   kj::Maybe<kj::Function<kj::String(const kj::Exception&)>&> traceEncoder;
    617 
    618   kj::TaskSet tasks;
    619 
    620   // =====================================================================================
    621   // ClientHook implementations
    622 
    623   class RpcClient: public ClientHook, public kj::Refcounted {
    624   public:
    625     RpcClient(RpcConnectionState& connectionState)
    626         : connectionState(kj::addRef(connectionState)) {}
    627 
    628     virtual kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor,
    629                                                 kj::Vector<int>& fds) = 0;
    630     // Writes a CapDescriptor referencing this client.  The CapDescriptor must be sent as part of
    631     // the very next message sent on the connection, as it may become invalid if other things
    632     // happen.
    633     //
    634     // If writing the descriptor adds a new export to the export table, or increments the refcount
    635     // on an existing one, then the ID is returned and the caller is responsible for removing it
    636     // later.
    637 
    638     virtual kj::Maybe<kj::Own<ClientHook>> writeTarget(
    639         rpc::MessageTarget::Builder target) = 0;
    640     // Writes the appropriate call target for calls to this capability and returns null.
    641     //
    642     // - OR -
    643     //
    644     // If calls have been redirected to some other local ClientHook, returns that hook instead.
    645     // This can happen if the capability represents a promise that has been resolved.
    646 
    647     virtual kj::Own<ClientHook> getInnermostClient() = 0;
    648     // If this client just wraps some other client -- even if it is only *temporarily* wrapping
    649     // that other client -- return a reference to the other client, transitively.  Otherwise,
    650     // return a new reference to *this.
    651 
    652     virtual void adoptFlowController(kj::Own<RpcFlowController> flowController) {
    653       // Called when a PromiseClient resolves to another RpcClient. If streaming calls were
    654       // outstanding on the old client, we'd like to keep using the same FlowController on the new
    655       // client, so as to keep the flow steady.
    656 
    657       if (this->flowController == nullptr) {
    658         // We don't have any existing flowController so we can adopt this one, yay!
    659         this->flowController = kj::mv(flowController);
    660       } else {
    661         // Apparently, there is an existing flowController. This is an unusual scenario: Apparently
    662         // we had two stream capabilities, we were streaming to both of them, and they later
    663         // resolved to the same capability. This probably never happens because streaming use cases
    664         // normally call for there to be only one client. But, it's certainly possible, and we need
    665         // to handle it. We'll do the conservative thing and just make sure that all the calls
    666         // finish. This may mean we'll over-buffer temporarily; oh well.
    667         connectionState->tasks.add(flowController->waitAllAcked().attach(kj::mv(flowController)));
    668       }
    669     }
    670 
    671     // implements ClientHook -----------------------------------------
    672 
    673     Request<AnyPointer, AnyPointer> newCall(
    674         uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
    675       return newCallNoIntercept(interfaceId, methodId, sizeHint);
    676     }
    677 
    678     Request<AnyPointer, AnyPointer> newCallNoIntercept(
    679         uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) {
    680       if (!connectionState->connection.is<Connected>()) {
    681         return newBrokenRequest(kj::cp(connectionState->connection.get<Disconnected>()), sizeHint);
    682       }
    683 
    684       auto request = kj::heap<RpcRequest>(
    685           *connectionState, *connectionState->connection.get<Connected>(),
    686           sizeHint, kj::addRef(*this));
    687       auto callBuilder = request->getCall();
    688 
    689       callBuilder.setInterfaceId(interfaceId);
    690       callBuilder.setMethodId(methodId);
    691 
    692       auto root = request->getRoot();
    693       return Request<AnyPointer, AnyPointer>(root, kj::mv(request));
    694     }
    695 
    696     VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
    697                                 kj::Own<CallContextHook>&& context) override {
    698       return callNoIntercept(interfaceId, methodId, kj::mv(context));
    699     }
    700 
    701     VoidPromiseAndPipeline callNoIntercept(uint64_t interfaceId, uint16_t methodId,
    702                                            kj::Own<CallContextHook>&& context) {
    703       // Implement call() by copying params and results messages.
    704 
    705       auto params = context->getParams();
    706       auto request = newCallNoIntercept(interfaceId, methodId, params.targetSize());
    707 
    708       request.set(params);
    709       context->releaseParams();
    710 
    711       // We can and should propagate cancellation.
    712       context->allowCancellation();
    713 
    714       return context->directTailCall(RequestHook::from(kj::mv(request)));
    715     }
    716 
    717     kj::Own<ClientHook> addRef() override {
    718       return kj::addRef(*this);
    719     }
    720     const void* getBrand() override {
    721       return connectionState.get();
    722     }
    723 
    724     kj::Own<RpcConnectionState> connectionState;
    725 
    726     kj::Maybe<kj::Own<RpcFlowController>> flowController;
    727     // Becomes non-null the first time a streaming call is made on this capability.
    728   };
    729 
    730   class ImportClient final: public RpcClient {
    731     // A ClientHook that wraps an entry in the import table.
    732 
    733   public:
    734     ImportClient(RpcConnectionState& connectionState, ImportId importId,
    735                  kj::Maybe<kj::AutoCloseFd> fd)
    736         : RpcClient(connectionState), importId(importId), fd(kj::mv(fd)) {}
    737 
    738     ~ImportClient() noexcept(false) {
    739       unwindDetector.catchExceptionsIfUnwinding([&]() {
    740         // Remove self from the import table, if the table is still pointing at us.
    741         KJ_IF_MAYBE(import, connectionState->imports.find(importId)) {
    742           KJ_IF_MAYBE(i, import->importClient) {
    743             if (i == this) {
    744               connectionState->imports.erase(importId);
    745             }
    746           }
    747         }
    748 
    749         // Send a message releasing our remote references.
    750         if (remoteRefcount > 0 && connectionState->connection.is<Connected>()) {
    751           auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
    752               messageSizeHint<rpc::Release>());
    753           rpc::Release::Builder builder = message->getBody().initAs<rpc::Message>().initRelease();
    754           builder.setId(importId);
    755           builder.setReferenceCount(remoteRefcount);
    756           message->send();
    757         }
    758       });
    759     }
    760 
    761     void setFdIfMissing(kj::Maybe<kj::AutoCloseFd> newFd) {
    762       if (fd == nullptr) {
    763         fd = kj::mv(newFd);
    764       }
    765     }
    766 
    767     void addRemoteRef() {
    768       // Add a new RemoteRef and return a new ref to this client representing it.
    769       ++remoteRefcount;
    770     }
    771 
    772     kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor,
    773                                         kj::Vector<int>& fds) override {
    774       descriptor.setReceiverHosted(importId);
    775       return nullptr;
    776     }
    777 
    778     kj::Maybe<kj::Own<ClientHook>> writeTarget(
    779         rpc::MessageTarget::Builder target) override {
    780       target.setImportedCap(importId);
    781       return nullptr;
    782     }
    783 
    784     kj::Own<ClientHook> getInnermostClient() override {
    785       return kj::addRef(*this);
    786     }
    787 
    788     // implements ClientHook -----------------------------------------
    789 
    790     kj::Maybe<ClientHook&> getResolved() override {
    791       return nullptr;
    792     }
    793 
    794     kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
    795       return nullptr;
    796     }
    797 
    798     kj::Maybe<int> getFd() override {
    799       return fd.map([](auto& f) { return f.get(); });
    800     }
    801 
    802   private:
    803     ImportId importId;
    804     kj::Maybe<kj::AutoCloseFd> fd;
    805 
    806     uint remoteRefcount = 0;
    807     // Number of times we've received this import from the peer.
    808 
    809     kj::UnwindDetector unwindDetector;
    810   };
    811 
    812   class PipelineClient final: public RpcClient {
    813     // A ClientHook representing a pipelined promise.  Always wrapped in PromiseClient.
    814 
    815   public:
    816     PipelineClient(RpcConnectionState& connectionState,
    817                    kj::Own<QuestionRef>&& questionRef,
    818                    kj::Array<PipelineOp>&& ops)
    819         : RpcClient(connectionState), questionRef(kj::mv(questionRef)), ops(kj::mv(ops)) {}
    820 
    821    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor,
    822                                        kj::Vector<int>& fds) override {
    823       auto promisedAnswer = descriptor.initReceiverAnswer();
    824       promisedAnswer.setQuestionId(questionRef->getId());
    825       promisedAnswer.adoptTransform(fromPipelineOps(
    826           Orphanage::getForMessageContaining(descriptor), ops));
    827       return nullptr;
    828     }
    829 
    830     kj::Maybe<kj::Own<ClientHook>> writeTarget(
    831         rpc::MessageTarget::Builder target) override {
    832       auto builder = target.initPromisedAnswer();
    833       builder.setQuestionId(questionRef->getId());
    834       builder.adoptTransform(fromPipelineOps(Orphanage::getForMessageContaining(builder), ops));
    835       return nullptr;
    836     }
    837 
    838     kj::Own<ClientHook> getInnermostClient() override {
    839       return kj::addRef(*this);
    840     }
    841 
    842     // implements ClientHook -----------------------------------------
    843 
    844     kj::Maybe<ClientHook&> getResolved() override {
    845       return nullptr;
    846     }
    847 
    848     kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
    849       return nullptr;
    850     }
    851 
    852     kj::Maybe<int> getFd() override {
    853       return nullptr;
    854     }
    855 
    856   private:
    857     kj::Own<QuestionRef> questionRef;
    858     kj::Array<PipelineOp> ops;
    859   };
    860 
    861   class PromiseClient final: public RpcClient {
    862     // A ClientHook that initially wraps one client (in practice, an ImportClient or a
    863     // PipelineClient) and then, later on, redirects to some other client.
    864 
    865   public:
    866     PromiseClient(RpcConnectionState& connectionState,
    867                   kj::Own<RpcClient> initial,
    868                   kj::Promise<kj::Own<ClientHook>> eventual,
    869                   kj::Maybe<ImportId> importId)
    870         : RpcClient(connectionState),
    871           cap(kj::mv(initial)),
    872           importId(importId),
    873           fork(eventual.then(
    874               [this](kj::Own<ClientHook>&& resolution) {
    875                 return resolve(kj::mv(resolution));
    876               }, [this](kj::Exception&& exception) {
    877                 return resolve(newBrokenCap(kj::mv(exception)));
    878               }).catch_([&](kj::Exception&& e) {
    879                 // Make any exceptions thrown from resolve() go to the connection's TaskSet which
    880                 // will cause the connection to be terminated.
    881                 connectionState.tasks.add(kj::cp(e));
    882                 return newBrokenCap(kj::mv(e));
    883               }).fork()) {}
    884     // Create a client that starts out forwarding all calls to `initial` but, once `eventual`
    885     // resolves, will forward there instead.
    886 
    887     ~PromiseClient() noexcept(false) {
    888       KJ_IF_MAYBE(id, importId) {
    889         // This object is representing an import promise.  That means the import table may still
    890         // contain a pointer back to it.  Remove that pointer.  Note that we have to verify that
    891         // the import still exists and the pointer still points back to this object because this
    892         // object may actually outlive the import.
    893         KJ_IF_MAYBE(import, connectionState->imports.find(*id)) {
    894           KJ_IF_MAYBE(c, import->appClient) {
    895             if (c == this) {
    896               import->appClient = nullptr;
    897             }
    898           }
    899         }
    900       }
    901     }
    902 
    903     kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor,
    904                                         kj::Vector<int>& fds) override {
    905       receivedCall = true;
    906       return connectionState->writeDescriptor(*cap, descriptor, fds);
    907     }
    908 
    909     kj::Maybe<kj::Own<ClientHook>> writeTarget(
    910         rpc::MessageTarget::Builder target) override {
    911       receivedCall = true;
    912       return connectionState->writeTarget(*cap, target);
    913     }
    914 
    915     kj::Own<ClientHook> getInnermostClient() override {
    916       receivedCall = true;
    917       return connectionState->getInnermostClient(*cap);
    918     }
    919 
    920     void adoptFlowController(kj::Own<RpcFlowController> flowController) override {
    921       if (cap->getBrand() == connectionState.get()) {
    922         // Pass the flow controller on to our inner cap.
    923         kj::downcast<RpcClient>(*cap).adoptFlowController(kj::mv(flowController));
    924       } else {
    925         // We resolved to a capability that isn't another RPC capability. We should simply make
    926         // sure that all the calls complete.
    927         connectionState->tasks.add(flowController->waitAllAcked().attach(kj::mv(flowController)));
    928       }
    929     }
    930 
    931     // implements ClientHook -----------------------------------------
    932 
    933     Request<AnyPointer, AnyPointer> newCall(
    934         uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
    935       receivedCall = true;
    936 
    937       // IMPORTANT: We must call our superclass's version of newCall(), NOT cap->newCall(), because
    938       //   the Request object we create needs to check at send() time whether the promise has
    939       //   resolved and, if so, redirect to the new target.
    940       return RpcClient::newCall(interfaceId, methodId, sizeHint);
    941     }
    942 
    943     VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
    944                                 kj::Own<CallContextHook>&& context) override {
    945       receivedCall = true;
    946       return cap->call(interfaceId, methodId, kj::mv(context));
    947     }
    948 
    949     kj::Maybe<ClientHook&> getResolved() override {
    950       if (isResolved()) {
    951         return *cap;
    952       } else {
    953         return nullptr;
    954       }
    955     }
    956 
    957     kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
    958       return fork.addBranch();
    959     }
    960 
    961     kj::Maybe<int> getFd() override {
    962       if (isResolved()) {
    963         return cap->getFd();
    964       } else {
    965         // In theory, before resolution, the ImportClient for the promise could have an FD
    966         // attached, if the promise itself was presented with an attached FD. However, we can't
    967         // really return that one here because it may be closed when we get the Resolve message
    968         // later. In theory we could have the PromiseClient itself take ownership of an FD that
    969         // arrived attached to a promise cap, but the use case for that is questionable. I'm
    970         // keeping it simple for now.
    971         return nullptr;
    972       }
    973     }
    974 
    975   private:
    976     kj::Own<ClientHook> cap;
    977 
    978     kj::Maybe<ImportId> importId;
    979     kj::ForkedPromise<kj::Own<ClientHook>> fork;
    980 
    981     bool receivedCall = false;
    982 
    983     enum {
    984       UNRESOLVED,
    985       // Not resolved at all yet.
    986 
    987       REMOTE,
    988       // Remote promise resolved to a remote settled capability (or null/error).
    989 
    990       REFLECTED,
    991       // Remote promise resolved to one of our own exports.
    992 
    993       MERGED,
    994       // Remote promise resolved to another remote promise which itself wasn't resolved yet, so we
    995       // merged them. In this case, `cap` is guaranteed to point to another PromiseClient.
    996 
    997       BROKEN
    998       // Resolved to null or error.
    999     } resolutionType = UNRESOLVED;
   1000 
   1001     inline bool isResolved() {
   1002       return resolutionType != UNRESOLVED;
   1003     }
   1004 
   1005     kj::Promise<kj::Own<ClientHook>> resolve(kj::Own<ClientHook> replacement) {
   1006       KJ_DASSERT(!isResolved());
   1007 
   1008       const void* replacementBrand = replacement->getBrand();
   1009       bool isSameConnection = replacementBrand == connectionState.get();
   1010       if (isSameConnection) {
   1011         // We resolved to some other RPC capability hosted by the same peer.
   1012         KJ_IF_MAYBE(promise, replacement->whenMoreResolved()) {
   1013           // We resolved to another remote promise. If *that* promise eventually resolves back
   1014           // to us, we'll need a disembargo. Possibilities:
   1015           // 1. The other promise hasn't resolved at all yet. In that case we can simply set its
   1016           //    `receivedCall` flag and let it handle the disembargo later.
   1017           // 2. The other promise has received a Resolve message and decided to initiate a
   1018           //    disembargo which it is still waiting for. In that case we will certainly also need
   1019           //    a disembargo for the same reason that the other promise did. And, we can't simply
   1020           //    wait for their disembargo; we need to start a new one of our own.
   1021           // 3. The other promise has resolved already (with or without a disembargo). In this
   1022           //    case we should treat it as if we resolved directly to the other promise's result,
   1023           //    possibly requiring a disembargo under the same conditions.
   1024 
   1025           // We know the other object is a PromiseClient because it's the only ClientHook
   1026           // type in the RPC implementation which returns non-null for `whenMoreResolved()`.
   1027           PromiseClient* other = &kj::downcast<PromiseClient>(*replacement);
   1028           while (other->resolutionType == MERGED) {
   1029             // There's no need to resolve to a thing that's just going to resolve to another thing.
   1030             replacement = other->cap->addRef();
   1031             other = &kj::downcast<PromiseClient>(*replacement);
   1032 
   1033             // Note that replacementBrand is unchanged since we'd only merge with other
   1034             // PromiseClients on the same connection.
   1035             KJ_DASSERT(replacement->getBrand() == replacementBrand);
   1036           }
   1037 
   1038           if (other->isResolved()) {
   1039             // The other capability resolved already. If it determined that it resolved as
   1040             // relfected, then we determine the same.
   1041             resolutionType = other->resolutionType;
   1042           } else {
   1043             // The other capability hasn't resolved yet, so we can safely merge with it and do a
   1044             // single combined disembargo if needed later.
   1045             other->receivedCall = other->receivedCall || receivedCall;
   1046             resolutionType = MERGED;
   1047           }
   1048         } else {
   1049           resolutionType = REMOTE;
   1050         }
   1051       } else {
   1052         if (replacementBrand == &ClientHook::NULL_CAPABILITY_BRAND ||
   1053             replacementBrand == &ClientHook::BROKEN_CAPABILITY_BRAND) {
   1054           // We don't consider null or broken capabilities as "reflected" because they may have
   1055           // been communicated to us literally as a null pointer or an exception on the wire,
   1056           // rather than as a reference to one of our exports, in which case a disembargo won't
   1057           // work. But also, call ordering is completely irrelevant with these so there's no need
   1058           // to disembargo anyway.
   1059           resolutionType = BROKEN;
   1060         } else {
   1061           resolutionType = REFLECTED;
   1062         }
   1063       }
   1064 
   1065       // Every branch above ends by setting resolutionType to something other than UNRESOLVED.
   1066       KJ_DASSERT(isResolved());
   1067 
   1068       // If the original capability was used for streaming calls, it will have a
   1069       // `flowController` that might still be shepherding those calls. We'll need make sure that
   1070       // it doesn't get thrown away. Note that we know that *cap is an RpcClient because resolve()
   1071       // is only called once and our constructor required that the initial capability is an
   1072       // RpcClient.
   1073       KJ_IF_MAYBE(f, kj::downcast<RpcClient>(*cap).flowController) {
   1074         if (isSameConnection) {
   1075           // The new target is on the same connection. It would make a lot of sense to keep using
   1076           // the same flow controller if possible.
   1077           kj::downcast<RpcClient>(*replacement).adoptFlowController(kj::mv(*f));
   1078         } else {
   1079           // The new target is something else. The best we can do is wait for the controller to
   1080           // drain. New calls will be flow-controlled in a new way without knowing about the old
   1081           // controller.
   1082           connectionState->tasks.add(f->get()->waitAllAcked().attach(kj::mv(*f)));
   1083         }
   1084       }
   1085 
   1086       if (resolutionType == REFLECTED && receivedCall &&
   1087           connectionState->connection.is<Connected>()) {
   1088         // The new capability is hosted locally, not on the remote machine.  And, we had made calls
   1089         // to the promise.  We need to make sure those calls echo back to us before we allow new
   1090         // calls to go directly to the local capability, so we need to set a local embargo and send
   1091         // a `Disembargo` to echo through the peer.
   1092 
   1093         auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
   1094             messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
   1095 
   1096         auto disembargo = message->getBody().initAs<rpc::Message>().initDisembargo();
   1097 
   1098         {
   1099           auto redirect = connectionState->writeTarget(*cap, disembargo.initTarget());
   1100           KJ_ASSERT(redirect == nullptr,
   1101                     "Original promise target should always be from this RPC connection.");
   1102         }
   1103 
   1104         EmbargoId embargoId;
   1105         Embargo& embargo = connectionState->embargoes.next(embargoId);
   1106 
   1107         disembargo.getContext().setSenderLoopback(embargoId);
   1108 
   1109         auto paf = kj::newPromiseAndFulfiller<void>();
   1110         embargo.fulfiller = kj::mv(paf.fulfiller);
   1111 
   1112         // Make a promise which resolves to `replacement` as soon as the `Disembargo` comes back.
   1113         auto embargoPromise = paf.promise.then([replacement = kj::mv(replacement)]() mutable {
   1114           return kj::mv(replacement);
   1115         });
   1116 
   1117         // We need to queue up calls in the meantime, so we'll resolve ourselves to a local promise
   1118         // client instead.
   1119         replacement = newLocalPromiseClient(kj::mv(embargoPromise));
   1120 
   1121         // Send the `Disembargo`.
   1122         message->send();
   1123       }
   1124 
   1125       cap = replacement->addRef();
   1126 
   1127       return kj::mv(replacement);
   1128     }
   1129   };
   1130 
   1131   kj::Maybe<ExportId> writeDescriptor(ClientHook& cap, rpc::CapDescriptor::Builder descriptor,
   1132                                       kj::Vector<int>& fds) {
   1133     // Write a descriptor for the given capability.
   1134 
   1135     // Find the innermost wrapped capability.
   1136     ClientHook* inner = &cap;
   1137     for (;;) {
   1138       KJ_IF_MAYBE(resolved, inner->getResolved()) {
   1139         inner = resolved;
   1140       } else {
   1141         break;
   1142       }
   1143     }
   1144 
   1145     KJ_IF_MAYBE(fd, inner->getFd()) {
   1146       descriptor.setAttachedFd(fds.size());
   1147       fds.add(kj::mv(*fd));
   1148     }
   1149 
   1150     if (inner->getBrand() == this) {
   1151       return kj::downcast<RpcClient>(*inner).writeDescriptor(descriptor, fds);
   1152     } else {
   1153       auto iter = exportsByCap.find(inner);
   1154       if (iter != exportsByCap.end()) {
   1155         // We've already seen and exported this capability before.  Just up the refcount.
   1156         auto& exp = KJ_ASSERT_NONNULL(exports.find(iter->second));
   1157         ++exp.refcount;
   1158         if (exp.resolveOp == nullptr) {
   1159           descriptor.setSenderHosted(iter->second);
   1160         } else {
   1161           descriptor.setSenderPromise(iter->second);
   1162         }
   1163         return iter->second;
   1164       } else {
   1165         // This is the first time we've seen this capability.
   1166         ExportId exportId;
   1167         auto& exp = exports.next(exportId);
   1168         exportsByCap[inner] = exportId;
   1169         exp.refcount = 1;
   1170         exp.clientHook = inner->addRef();
   1171 
   1172         KJ_IF_MAYBE(wrapped, inner->whenMoreResolved()) {
   1173           // This is a promise.  Arrange for the `Resolve` message to be sent later.
   1174           exp.resolveOp = resolveExportedPromise(exportId, kj::mv(*wrapped));
   1175           descriptor.setSenderPromise(exportId);
   1176         } else {
   1177           descriptor.setSenderHosted(exportId);
   1178         }
   1179 
   1180         return exportId;
   1181       }
   1182     }
   1183   }
   1184 
   1185   kj::Array<ExportId> writeDescriptors(kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> capTable,
   1186                                        rpc::Payload::Builder payload, kj::Vector<int>& fds) {
   1187     if (capTable.size() == 0) {
   1188       // Calling initCapTable(0) will still allocate a 1-word tag, which we'd like to avoid...
   1189       return nullptr;
   1190     }
   1191 
   1192     auto capTableBuilder = payload.initCapTable(capTable.size());
   1193     kj::Vector<ExportId> exports(capTable.size());
   1194     for (uint i: kj::indices(capTable)) {
   1195       KJ_IF_MAYBE(cap, capTable[i]) {
   1196         KJ_IF_MAYBE(exportId, writeDescriptor(**cap, capTableBuilder[i], fds)) {
   1197           exports.add(*exportId);
   1198         }
   1199       } else {
   1200         capTableBuilder[i].setNone();
   1201       }
   1202     }
   1203     return exports.releaseAsArray();
   1204   }
   1205 
   1206   kj::Maybe<kj::Own<ClientHook>> writeTarget(ClientHook& cap, rpc::MessageTarget::Builder target) {
   1207     // If calls to the given capability should pass over this connection, fill in `target`
   1208     // appropriately for such a call and return nullptr.  Otherwise, return a `ClientHook` to which
   1209     // the call should be forwarded; the caller should then delegate the call to that `ClientHook`.
   1210     //
   1211     // The main case where this ends up returning non-null is if `cap` is a promise that has
   1212     // recently resolved.  The application might have started building a request before the promise
   1213     // resolved, and so the request may have been built on the assumption that it would be sent over
   1214     // this network connection, but then the promise resolved to point somewhere else before the
   1215     // request was sent.  Now the request has to be redirected to the new target instead.
   1216 
   1217     if (cap.getBrand() == this) {
   1218       return kj::downcast<RpcClient>(cap).writeTarget(target);
   1219     } else {
   1220       return cap.addRef();
   1221     }
   1222   }
   1223 
   1224   kj::Own<ClientHook> getInnermostClient(ClientHook& client) {
   1225     ClientHook* ptr = &client;
   1226     for (;;) {
   1227       KJ_IF_MAYBE(inner, ptr->getResolved()) {
   1228         ptr = inner;
   1229       } else {
   1230         break;
   1231       }
   1232     }
   1233 
   1234     if (ptr->getBrand() == this) {
   1235       return kj::downcast<RpcClient>(*ptr).getInnermostClient();
   1236     } else {
   1237       return ptr->addRef();
   1238     }
   1239   }
   1240 
   1241   kj::Promise<void> resolveExportedPromise(
   1242       ExportId exportId, kj::Promise<kj::Own<ClientHook>>&& promise) {
   1243     // Implements exporting of a promise.  The promise has been exported under the given ID, and is
   1244     // to eventually resolve to the ClientHook produced by `promise`.  This method waits for that
   1245     // resolve to happen and then sends the appropriate `Resolve` message to the peer.
   1246 
   1247     return promise.then(
   1248         [this,exportId](kj::Own<ClientHook>&& resolution) -> kj::Promise<void> {
   1249       // Successful resolution.
   1250 
   1251       KJ_ASSERT(connection.is<Connected>(),
   1252                 "Resolving export should have been canceled on disconnect.") {
   1253         return kj::READY_NOW;
   1254       }
   1255 
   1256       // Get the innermost ClientHook backing the resolved client.  This includes traversing
   1257       // PromiseClients that haven't resolved yet to their underlying ImportClient or
   1258       // PipelineClient, so that we get a remote promise that might resolve later.  This is
   1259       // important to make sure that if the peer sends a `Disembargo` back to us, it bounces back
   1260       // correctly instead of going to the result of some future resolution.  See the documentation
   1261       // for `Disembargo` in `rpc.capnp`.
   1262       resolution = getInnermostClient(*resolution);
   1263 
   1264       // Update the export table to point at this object instead.  We know that our entry in the
   1265       // export table is still live because when it is destroyed the asynchronous resolution task
   1266       // (i.e. this code) is canceled.
   1267       auto& exp = KJ_ASSERT_NONNULL(exports.find(exportId));
   1268       exportsByCap.erase(exp.clientHook);
   1269       exp.clientHook = kj::mv(resolution);
   1270 
   1271       if (exp.clientHook->getBrand() != this) {
   1272         // We're resolving to a local capability.  If we're resolving to a promise, we might be
   1273         // able to reuse our export table entry and avoid sending a message.
   1274 
   1275         KJ_IF_MAYBE(promise, exp.clientHook->whenMoreResolved()) {
   1276           // We're replacing a promise with another local promise.  In this case, we might actually
   1277           // be able to just reuse the existing export table entry to represent the new promise --
   1278           // unless it already has an entry.  Let's check.
   1279 
   1280           auto insertResult = exportsByCap.insert(std::make_pair(exp.clientHook.get(), exportId));
   1281 
   1282           if (insertResult.second) {
   1283             // The new promise was not already in the table, therefore the existing export table
   1284             // entry has now been repurposed to represent it.  There is no need to send a resolve
   1285             // message at all.  We do, however, have to start resolving the next promise.
   1286             return resolveExportedPromise(exportId, kj::mv(*promise));
   1287           }
   1288         }
   1289       }
   1290 
   1291       // OK, we have to send a `Resolve` message.
   1292       auto message = connection.get<Connected>()->newOutgoingMessage(
   1293           messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::CapDescriptor>() + 16);
   1294       auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
   1295       resolve.setPromiseId(exportId);
   1296       kj::Vector<int> fds;
   1297       writeDescriptor(*exp.clientHook, resolve.initCap(), fds);
   1298       message->setFds(fds.releaseAsArray());
   1299       message->send();
   1300 
   1301       return kj::READY_NOW;
   1302     }, [this,exportId](kj::Exception&& exception) {
   1303       // send error resolution
   1304       auto message = connection.get<Connected>()->newOutgoingMessage(
   1305           messageSizeHint<rpc::Resolve>() + exceptionSizeHint(exception) + 8);
   1306       auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
   1307       resolve.setPromiseId(exportId);
   1308       fromException(exception, resolve.initException());
   1309       message->send();
   1310     }).eagerlyEvaluate([this](kj::Exception&& exception) {
   1311       // Put the exception on the TaskSet which will cause the connection to be terminated.
   1312       tasks.add(kj::mv(exception));
   1313     });
   1314   }
   1315 
   1316   void fromException(const kj::Exception& exception, rpc::Exception::Builder builder) {
   1317     _::fromException(exception, builder, traceEncoder);
   1318   }
   1319 
   1320   // =====================================================================================
   1321   // Interpreting CapDescriptor
   1322 
   1323   kj::Own<ClientHook> import(ImportId importId, bool isPromise, kj::Maybe<kj::AutoCloseFd> fd) {
   1324     // Receive a new import.
   1325 
   1326     auto& import = imports[importId];
   1327     kj::Own<ImportClient> importClient;
   1328 
   1329     // Create the ImportClient, or if one already exists, use it.
   1330     KJ_IF_MAYBE(c, import.importClient) {
   1331       importClient = kj::addRef(*c);
   1332 
   1333       // If the same import is introduced multiple times, and it is missing an FD the first time,
   1334       // but it has one on a later attempt, we want to attach the later one. This could happen
   1335       // because the first introduction was part of a message that had too many other FDs and went
   1336       // over the per-message limit. Perhaps the protocol design is such that this other message
   1337       // doesn't really care if the FDs are transferred or not, but the later message really does
   1338       // care; it would be bad if the previous message blocked later messages from delivering the
   1339       // FD just because it happened to reference the same capability.
   1340       importClient->setFdIfMissing(kj::mv(fd));
   1341     } else {
   1342       importClient = kj::refcounted<ImportClient>(*this, importId, kj::mv(fd));
   1343       import.importClient = *importClient;
   1344     }
   1345 
   1346     // We just received a copy of this import ID, so the remote refcount has gone up.
   1347     importClient->addRemoteRef();
   1348 
   1349     if (isPromise) {
   1350       // We need to construct a PromiseClient around this import, if we haven't already.
   1351       KJ_IF_MAYBE(c, import.appClient) {
   1352         // Use the existing one.
   1353         return kj::addRef(*c);
   1354       } else {
   1355         // Create a promise for this import's resolution.
   1356         auto paf = kj::newPromiseAndFulfiller<kj::Own<ClientHook>>();
   1357         import.promiseFulfiller = kj::mv(paf.fulfiller);
   1358 
   1359         // Make sure the import is not destroyed while this promise exists.
   1360         paf.promise = paf.promise.attach(kj::addRef(*importClient));
   1361 
   1362         // Create a PromiseClient around it and return it.
   1363         auto result = kj::refcounted<PromiseClient>(
   1364             *this, kj::mv(importClient), kj::mv(paf.promise), importId);
   1365         import.appClient = *result;
   1366         return kj::mv(result);
   1367       }
   1368     } else {
   1369       import.appClient = *importClient;
   1370       return kj::mv(importClient);
   1371     }
   1372   }
   1373 
   1374   class TribbleRaceBlocker: public ClientHook, public kj::Refcounted {
   1375     // Hack to work around a problem that arises during the Tribble 4-way Race Condition as
   1376     // described in rpc.capnp in the documentation for the `Disembargo` message.
   1377     //
   1378     // Consider a remote promise that is resolved by a `Resolve` message. PromiseClient::resolve()
   1379     // is eventually called and given the `ClientHook` for the resolution. Imagine that the
   1380     // `ClientHook` it receives turns out to be an `ImportClient`. There are two ways this could
   1381     // have happened:
   1382     //
   1383     // 1. The `Resolve` message contained a `CapDescriptor` of type `senderHosted`, naming an entry
   1384     //    in the sender's export table, and the `ImportClient` refers to the corresponding slot on
   1385     //    the receiver's import table. In this case, no embargo is needed, because messages to the
   1386     //    resolved location traverse the same path as messages to the promise would have.
   1387     //
   1388     // 2. The `Resolve` message contained a `CapDescriptor` of type `receiverHosted`, naming an
   1389     //    entry in the receiver's export table. That entry just happened to contain an
   1390     //    `ImportClient` refering back to the sender. This specifically happens when the entry
   1391     //    in question had previously itself referred to a promise, and that promise has since
   1392     //    resolved to a remote capability, at which point the export table entry was replaced by
   1393     //    the appropriate `ImportClient` representing that. Presumably, the peer *did not yet know*
   1394     //    about this resolution, which is why it sent a `receiverHosted` pointing to something that
   1395     //    reflects back to the sender, rather than sending `senderHosted` in the first place.
   1396     //
   1397     //    In this case, an embargo *is* required, because peer may still be reflecting messages
   1398     //    sent to this promise back to us. In fact, the peer *must* continue reflecting messages,
   1399     //    even when it eventually learns that the eventual destination is one of its own
   1400     //    capabilities, due to the Tribble 4-way Race Condition rule.
   1401     //
   1402     //    Since this case requires an embargo, somehow PromiseClient::resolve() must be able to
   1403     //    distinguish it from the case (1). One solution would be for us to pass some extra flag
   1404     //    all the way from where the `Resolve` messages is received to `PromiseClient::resolve()`.
   1405     //    That solution is reasonably easy in the `Resolve` case, but gets notably more difficult
   1406     //    in the case of `Return`s, which also resolve promises and are subject to all the same
   1407     //    problems. In the case of a `Return`, some non-RPC-specific code is involved in the
   1408     //    resolution, making it harder to pass along a flag.
   1409     //
   1410     //    Instead, we use this hack: When we read an entry in the export table and discover that
   1411     //    it actually contains an `ImportClient` or a `PipelineClient` reflecting back over our
   1412     //    own connection, then we wrap it in a `TribbleRaceBlocker`. This wrapper prevents
   1413     //    `PromiseClient` from recognizing the capability as being remote, so it instead treats it
   1414     //    as local. That causes it to set up an embargo as desired.
   1415     //
   1416     // TODO(perf): This actually blocks further promise resolution in the case where the
   1417     //   ImportClient or PipelineClient itself ends up being yet another promise that resolves
   1418     //   back over the connection again. What we probably really need to do here is, instead of
   1419     //   placing `ImportClient` or `PipelineClient` on the export table, place a special type there
   1420     //   that both knows what to do with future incoming messages to that export ID, but also knows
   1421     //   what to do when that export is the subject of a `Resolve`.
   1422 
   1423   public:
   1424     TribbleRaceBlocker(kj::Own<ClientHook> inner): inner(kj::mv(inner)) {}
   1425 
   1426     Request<AnyPointer, AnyPointer> newCall(
   1427         uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
   1428       return inner->newCall(interfaceId, methodId, sizeHint);
   1429     }
   1430     VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
   1431                                 kj::Own<CallContextHook>&& context) override {
   1432       return inner->call(interfaceId, methodId, kj::mv(context));
   1433     }
   1434     kj::Maybe<ClientHook&> getResolved() override {
   1435       // We always wrap either PipelineClient or ImportClient, both of which return null for this
   1436       // anyway.
   1437       return nullptr;
   1438     }
   1439     kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
   1440       // We always wrap either PipelineClient or ImportClient, both of which return null for this
   1441       // anyway.
   1442       return nullptr;
   1443     }
   1444     kj::Own<ClientHook> addRef() override {
   1445       return kj::addRef(*this);
   1446     }
   1447     const void* getBrand() override {
   1448       return nullptr;
   1449     }
   1450     kj::Maybe<int> getFd() override {
   1451       return inner->getFd();
   1452     }
   1453 
   1454   private:
   1455     kj::Own<ClientHook> inner;
   1456   };
   1457 
   1458   kj::Maybe<kj::Own<ClientHook>> receiveCap(rpc::CapDescriptor::Reader descriptor,
   1459                                             kj::ArrayPtr<kj::AutoCloseFd> fds) {
   1460     uint fdIndex = descriptor.getAttachedFd();
   1461     kj::Maybe<kj::AutoCloseFd> fd;
   1462     if (fdIndex < fds.size() && fds[fdIndex] != nullptr) {
   1463       fd = kj::mv(fds[fdIndex]);
   1464     }
   1465 
   1466     switch (descriptor.which()) {
   1467       case rpc::CapDescriptor::NONE:
   1468         return nullptr;
   1469 
   1470       case rpc::CapDescriptor::SENDER_HOSTED:
   1471         return import(descriptor.getSenderHosted(), false, kj::mv(fd));
   1472       case rpc::CapDescriptor::SENDER_PROMISE:
   1473         return import(descriptor.getSenderPromise(), true, kj::mv(fd));
   1474 
   1475       case rpc::CapDescriptor::RECEIVER_HOSTED:
   1476         KJ_IF_MAYBE(exp, exports.find(descriptor.getReceiverHosted())) {
   1477           auto result = exp->clientHook->addRef();
   1478           if (result->getBrand() == this) {
   1479             result = kj::refcounted<TribbleRaceBlocker>(kj::mv(result));
   1480           }
   1481           return kj::mv(result);
   1482         } else {
   1483           return newBrokenCap("invalid 'receiverHosted' export ID");
   1484         }
   1485 
   1486       case rpc::CapDescriptor::RECEIVER_ANSWER: {
   1487         auto promisedAnswer = descriptor.getReceiverAnswer();
   1488 
   1489         KJ_IF_MAYBE(answer, answers.find(promisedAnswer.getQuestionId())) {
   1490           if (answer->active) {
   1491             KJ_IF_MAYBE(pipeline, answer->pipeline) {
   1492               KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
   1493                 auto result = pipeline->get()->getPipelinedCap(*ops);
   1494                 if (result->getBrand() == this) {
   1495                   result = kj::refcounted<TribbleRaceBlocker>(kj::mv(result));
   1496                 }
   1497                 return kj::mv(result);
   1498               } else {
   1499                 return newBrokenCap("unrecognized pipeline ops");
   1500               }
   1501             }
   1502           }
   1503         }
   1504 
   1505         return newBrokenCap("invalid 'receiverAnswer'");
   1506       }
   1507 
   1508       case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
   1509         // We don't support third-party caps, so use the vine instead.
   1510         return import(descriptor.getThirdPartyHosted().getVineId(), false, kj::mv(fd));
   1511 
   1512       default:
   1513         KJ_FAIL_REQUIRE("unknown CapDescriptor type") { break; }
   1514         return newBrokenCap("unknown CapDescriptor type");
   1515     }
   1516   }
   1517 
   1518   kj::Array<kj::Maybe<kj::Own<ClientHook>>> receiveCaps(List<rpc::CapDescriptor>::Reader capTable,
   1519                                                         kj::ArrayPtr<kj::AutoCloseFd> fds) {
   1520     auto result = kj::heapArrayBuilder<kj::Maybe<kj::Own<ClientHook>>>(capTable.size());
   1521     for (auto cap: capTable) {
   1522       result.add(receiveCap(cap, fds));
   1523     }
   1524     return result.finish();
   1525   }
   1526 
   1527   // =====================================================================================
   1528   // RequestHook/PipelineHook/ResponseHook implementations
   1529 
   1530   class QuestionRef: public kj::Refcounted {
   1531     // A reference to an entry on the question table.  Used to detect when the `Finish` message
   1532     // can be sent.
   1533 
   1534   public:
   1535     inline QuestionRef(
   1536         RpcConnectionState& connectionState, QuestionId id,
   1537         kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller)
   1538         : connectionState(kj::addRef(connectionState)), id(id), fulfiller(kj::mv(fulfiller)) {}
   1539 
   1540     ~QuestionRef() noexcept {
   1541       // Contrary to KJ style, we declare this destructor `noexcept` because if anything in here
   1542       // throws (without being caught) we're probably in pretty bad shape and going to be crashing
   1543       // later anyway. Better to abort now.
   1544 
   1545       auto& question = KJ_ASSERT_NONNULL(
   1546           connectionState->questions.find(id), "Question ID no longer on table?");
   1547 
   1548       // Send the "Finish" message (if the connection is not already broken).
   1549       if (connectionState->connection.is<Connected>() && !question.skipFinish) {
   1550         KJ_IF_MAYBE(e, kj::runCatchingExceptions([&]() {
   1551           auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
   1552               messageSizeHint<rpc::Finish>());
   1553           auto builder = message->getBody().getAs<rpc::Message>().initFinish();
   1554           builder.setQuestionId(id);
   1555           // If we're still awaiting a return, then this request is being canceled, and we're going
   1556           // to ignore any capabilities in the return message, so set releaseResultCaps true. If we
   1557           // already received the return, then we've already built local proxies for the caps and
   1558           // will send Release messages when those are destroyed.
   1559           builder.setReleaseResultCaps(question.isAwaitingReturn);
   1560           message->send();
   1561         })) {
   1562           connectionState->disconnect(kj::mv(*e));
   1563         }
   1564       }
   1565 
   1566       // Check if the question has returned and, if so, remove it from the table.
   1567       // Remove question ID from the table.  Must do this *after* sending `Finish` to ensure that
   1568       // the ID is not re-allocated before the `Finish` message can be sent.
   1569       if (question.isAwaitingReturn) {
   1570         // Still waiting for return, so just remove the QuestionRef pointer from the table.
   1571         question.selfRef = nullptr;
   1572       } else {
   1573         // Call has already returned, so we can now remove it from the table.
   1574         connectionState->questions.erase(id, question);
   1575       }
   1576     }
   1577 
   1578     inline QuestionId getId() const { return id; }
   1579 
   1580     void fulfill(kj::Own<RpcResponse>&& response) {
   1581       fulfiller->fulfill(kj::mv(response));
   1582     }
   1583 
   1584     void fulfill(kj::Promise<kj::Own<RpcResponse>>&& promise) {
   1585       fulfiller->fulfill(kj::mv(promise));
   1586     }
   1587 
   1588     void reject(kj::Exception&& exception) {
   1589       fulfiller->reject(kj::mv(exception));
   1590     }
   1591 
   1592   private:
   1593     kj::Own<RpcConnectionState> connectionState;
   1594     QuestionId id;
   1595     kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller;
   1596   };
   1597 
   1598   class RpcRequest final: public RequestHook {
   1599   public:
   1600     RpcRequest(RpcConnectionState& connectionState, VatNetworkBase::Connection& connection,
   1601                kj::Maybe<MessageSize> sizeHint, kj::Own<RpcClient>&& target)
   1602         : connectionState(kj::addRef(connectionState)),
   1603           target(kj::mv(target)),
   1604           message(connection.newOutgoingMessage(
   1605               firstSegmentSize(sizeHint, messageSizeHint<rpc::Call>() +
   1606                   sizeInWords<rpc::Payload>() + MESSAGE_TARGET_SIZE_HINT))),
   1607           callBuilder(message->getBody().getAs<rpc::Message>().initCall()),
   1608           paramsBuilder(capTable.imbue(callBuilder.getParams().getContent())) {}
   1609 
   1610     inline AnyPointer::Builder getRoot() {
   1611       return paramsBuilder;
   1612     }
   1613     inline rpc::Call::Builder getCall() {
   1614       return callBuilder;
   1615     }
   1616 
   1617     RemotePromise<AnyPointer> send() override {
   1618       if (!connectionState->connection.is<Connected>()) {
   1619         // Connection is broken.
   1620         const kj::Exception& e = connectionState->connection.get<Disconnected>();
   1621         return RemotePromise<AnyPointer>(
   1622             kj::Promise<Response<AnyPointer>>(kj::cp(e)),
   1623             AnyPointer::Pipeline(newBrokenPipeline(kj::cp(e))));
   1624       }
   1625 
   1626       KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
   1627         // Whoops, this capability has been redirected while we were building the request!
   1628         // We'll have to make a new request and do a copy.  Ick.
   1629 
   1630         auto replacement = redirect->get()->newCall(
   1631             callBuilder.getInterfaceId(), callBuilder.getMethodId(), paramsBuilder.targetSize());
   1632         replacement.set(paramsBuilder);
   1633         return replacement.send();
   1634       } else {
   1635         auto sendResult = sendInternal(false);
   1636 
   1637         auto forkedPromise = sendResult.promise.fork();
   1638 
   1639         // The pipeline must get notified of resolution before the app does to maintain ordering.
   1640         auto pipeline = kj::refcounted<RpcPipeline>(
   1641             *connectionState, kj::mv(sendResult.questionRef), forkedPromise.addBranch());
   1642 
   1643         auto appPromise = forkedPromise.addBranch().then(
   1644             [=](kj::Own<RpcResponse>&& response) {
   1645               auto reader = response->getResults();
   1646               return Response<AnyPointer>(reader, kj::mv(response));
   1647             });
   1648 
   1649         return RemotePromise<AnyPointer>(
   1650             kj::mv(appPromise),
   1651             AnyPointer::Pipeline(kj::mv(pipeline)));
   1652       }
   1653     }
   1654 
   1655     kj::Promise<void> sendStreaming() override {
   1656       if (!connectionState->connection.is<Connected>()) {
   1657         // Connection is broken.
   1658         return kj::cp(connectionState->connection.get<Disconnected>());
   1659       }
   1660 
   1661       KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
   1662         // Whoops, this capability has been redirected while we were building the request!
   1663         // We'll have to make a new request and do a copy.  Ick.
   1664 
   1665         auto replacement = redirect->get()->newCall(
   1666             callBuilder.getInterfaceId(), callBuilder.getMethodId(), paramsBuilder.targetSize());
   1667         replacement.set(paramsBuilder);
   1668         return RequestHook::from(kj::mv(replacement))->sendStreaming();
   1669       } else {
   1670         return sendStreamingInternal(false);
   1671       }
   1672     }
   1673 
   1674     struct TailInfo {
   1675       QuestionId questionId;
   1676       kj::Promise<void> promise;
   1677       kj::Own<PipelineHook> pipeline;
   1678     };
   1679 
   1680     kj::Maybe<TailInfo> tailSend() {
   1681       // Send the request as a tail call.
   1682       //
   1683       // Returns null if for some reason a tail call is not possible and the caller should fall
   1684       // back to using send() and copying the response.
   1685 
   1686       SendInternalResult sendResult;
   1687 
   1688       if (!connectionState->connection.is<Connected>()) {
   1689         // Disconnected; fall back to a regular send() which will fail appropriately.
   1690         return nullptr;
   1691       }
   1692 
   1693       KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
   1694         // Whoops, this capability has been redirected while we were building the request!
   1695         // Fall back to regular send().
   1696         return nullptr;
   1697       } else {
   1698         sendResult = sendInternal(true);
   1699       }
   1700 
   1701       auto promise = sendResult.promise.then([](kj::Own<RpcResponse>&& response) {
   1702         // Response should be null if `Return` handling code is correct.
   1703         KJ_ASSERT(!response) { break; }
   1704       });
   1705 
   1706       QuestionId questionId = sendResult.questionRef->getId();
   1707 
   1708       auto pipeline = kj::refcounted<RpcPipeline>(*connectionState, kj::mv(sendResult.questionRef));
   1709 
   1710       return TailInfo { questionId, kj::mv(promise), kj::mv(pipeline) };
   1711     }
   1712 
   1713     const void* getBrand() override {
   1714       return connectionState.get();
   1715     }
   1716 
   1717   private:
   1718     kj::Own<RpcConnectionState> connectionState;
   1719 
   1720     kj::Own<RpcClient> target;
   1721     kj::Own<OutgoingRpcMessage> message;
   1722     BuilderCapabilityTable capTable;
   1723     rpc::Call::Builder callBuilder;
   1724     AnyPointer::Builder paramsBuilder;
   1725 
   1726     struct SendInternalResult {
   1727       kj::Own<QuestionRef> questionRef;
   1728       kj::Promise<kj::Own<RpcResponse>> promise = nullptr;
   1729     };
   1730 
   1731     struct SetupSendResult: public SendInternalResult {
   1732       QuestionId questionId;
   1733       Question& question;
   1734 
   1735       SetupSendResult(SendInternalResult&& super, QuestionId questionId, Question& question)
   1736           : SendInternalResult(kj::mv(super)), questionId(questionId), question(question) {}
   1737       // TODO(cleanup): This constructor is implicit in C++17.
   1738     };
   1739 
   1740     SetupSendResult setupSend(bool isTailCall) {
   1741       // Build the cap table.
   1742       kj::Vector<int> fds;
   1743       auto exports = connectionState->writeDescriptors(
   1744           capTable.getTable(), callBuilder.getParams(), fds);
   1745       message->setFds(fds.releaseAsArray());
   1746 
   1747       // Init the question table.  Do this after writing descriptors to avoid interference.
   1748       QuestionId questionId;
   1749       auto& question = connectionState->questions.next(questionId);
   1750       question.isAwaitingReturn = true;
   1751       question.paramExports = kj::mv(exports);
   1752       question.isTailCall = isTailCall;
   1753 
   1754       // Make the QuentionRef and result promise.
   1755       SendInternalResult result;
   1756       auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
   1757       result.questionRef = kj::refcounted<QuestionRef>(
   1758           *connectionState, questionId, kj::mv(paf.fulfiller));
   1759       question.selfRef = *result.questionRef;
   1760       result.promise = paf.promise.attach(kj::addRef(*result.questionRef));
   1761 
   1762       return { kj::mv(result), questionId, question };
   1763     }
   1764 
   1765     SendInternalResult sendInternal(bool isTailCall) {
   1766       auto result = setupSend(isTailCall);
   1767 
   1768       // Finish and send.
   1769       callBuilder.setQuestionId(result.questionId);
   1770       if (isTailCall) {
   1771         callBuilder.getSendResultsTo().setYourself();
   1772       }
   1773       KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
   1774         KJ_CONTEXT("sending RPC call",
   1775            callBuilder.getInterfaceId(), callBuilder.getMethodId());
   1776         message->send();
   1777       })) {
   1778         // We can't safely throw the exception from here since we've already modified the question
   1779         // table state. We'll have to reject the promise instead.
   1780         result.question.isAwaitingReturn = false;
   1781         result.question.skipFinish = true;
   1782         connectionState->releaseExports(result.question.paramExports);
   1783         result.questionRef->reject(kj::mv(*exception));
   1784       }
   1785 
   1786       // Send and return.
   1787       return kj::mv(result);
   1788     }
   1789 
   1790     kj::Promise<void> sendStreamingInternal(bool isTailCall) {
   1791       auto setup = setupSend(isTailCall);
   1792 
   1793       // Finish and send.
   1794       callBuilder.setQuestionId(setup.questionId);
   1795       if (isTailCall) {
   1796         callBuilder.getSendResultsTo().setYourself();
   1797       }
   1798       kj::Promise<void> flowPromise = nullptr;
   1799       KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
   1800         KJ_CONTEXT("sending RPC call",
   1801            callBuilder.getInterfaceId(), callBuilder.getMethodId());
   1802         RpcFlowController* flow;
   1803         KJ_IF_MAYBE(f, target->flowController) {
   1804           flow = *f;
   1805         } else {
   1806           flow = target->flowController.emplace(
   1807               connectionState->connection.get<Connected>()->newStream());
   1808         }
   1809         flowPromise = flow->send(kj::mv(message), setup.promise.ignoreResult());
   1810       })) {
   1811         // We can't safely throw the exception from here since we've already modified the question
   1812         // table state. We'll have to reject the promise instead.
   1813         setup.question.isAwaitingReturn = false;
   1814         setup.question.skipFinish = true;
   1815         setup.questionRef->reject(kj::cp(*exception));
   1816         return kj::mv(*exception);
   1817       }
   1818 
   1819       return kj::mv(flowPromise);
   1820     }
   1821   };
   1822 
   1823   class RpcPipeline final: public PipelineHook, public kj::Refcounted {
   1824   public:
   1825     RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef,
   1826                 kj::Promise<kj::Own<RpcResponse>>&& redirectLaterParam)
   1827         : connectionState(kj::addRef(connectionState)),
   1828           redirectLater(redirectLaterParam.fork()),
   1829           resolveSelfPromise(KJ_ASSERT_NONNULL(redirectLater).addBranch().then(
   1830               [this](kj::Own<RpcResponse>&& response) {
   1831                 resolve(kj::mv(response));
   1832               }, [this](kj::Exception&& exception) {
   1833                 resolve(kj::mv(exception));
   1834               }).eagerlyEvaluate([&](kj::Exception&& e) {
   1835                 // Make any exceptions thrown from resolve() go to the connection's TaskSet which
   1836                 // will cause the connection to be terminated.
   1837                 connectionState.tasks.add(kj::mv(e));
   1838               })) {
   1839       // Construct a new RpcPipeline.
   1840 
   1841       state.init<Waiting>(kj::mv(questionRef));
   1842     }
   1843 
   1844     RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef)
   1845         : connectionState(kj::addRef(connectionState)),
   1846           resolveSelfPromise(nullptr) {
   1847       // Construct a new RpcPipeline that is never expected to resolve.
   1848 
   1849       state.init<Waiting>(kj::mv(questionRef));
   1850     }
   1851 
   1852     // implements PipelineHook ---------------------------------------
   1853 
   1854     kj::Own<PipelineHook> addRef() override {
   1855       return kj::addRef(*this);
   1856     }
   1857 
   1858     kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
   1859       auto copy = kj::heapArrayBuilder<PipelineOp>(ops.size());
   1860       for (auto& op: ops) {
   1861         copy.add(op);
   1862       }
   1863       return getPipelinedCap(copy.finish());
   1864     }
   1865 
   1866     kj::Own<ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) override {
   1867       return clientMap.findOrCreate(ops.asPtr(), [&]() {
   1868         if (state.is<Waiting>()) {
   1869           // Wrap a PipelineClient in a PromiseClient.
   1870           auto pipelineClient = kj::refcounted<PipelineClient>(
   1871               *connectionState, kj::addRef(*state.get<Waiting>()), kj::heapArray(ops.asPtr()));
   1872 
   1873           KJ_IF_MAYBE(r, redirectLater) {
   1874             auto resolutionPromise = r->addBranch().then(
   1875                 [ops = kj::heapArray(ops.asPtr())](kj::Own<RpcResponse>&& response) {
   1876                   return response->getResults().getPipelinedCap(kj::mv(ops));
   1877                 });
   1878 
   1879             return kj::HashMap<kj::Array<PipelineOp>, kj::Own<ClientHook>>::Entry {
   1880               kj::mv(ops),
   1881               kj::refcounted<PromiseClient>(
   1882                   *connectionState, kj::mv(pipelineClient), kj::mv(resolutionPromise), nullptr)
   1883             };
   1884           } else {
   1885             // Oh, this pipeline will never get redirected, so just return the PipelineClient.
   1886             return kj::HashMap<kj::Array<PipelineOp>, kj::Own<ClientHook>>::Entry {
   1887               kj::mv(ops), kj::mv(pipelineClient)
   1888             };
   1889           }
   1890         } else if (state.is<Resolved>()) {
   1891           auto pipelineClient = state.get<Resolved>()->getResults().getPipelinedCap(ops);
   1892           return kj::HashMap<kj::Array<PipelineOp>, kj::Own<ClientHook>>::Entry {
   1893             kj::mv(ops), kj::mv(pipelineClient)
   1894           };
   1895         } else {
   1896           return kj::HashMap<kj::Array<PipelineOp>, kj::Own<ClientHook>>::Entry {
   1897             kj::mv(ops), newBrokenCap(kj::cp(state.get<Broken>()))
   1898           };
   1899         }
   1900       })->addRef();
   1901     }
   1902 
   1903   private:
   1904     kj::Own<RpcConnectionState> connectionState;
   1905     kj::Maybe<kj::ForkedPromise<kj::Own<RpcResponse>>> redirectLater;
   1906 
   1907     typedef kj::Own<QuestionRef> Waiting;
   1908     typedef kj::Own<RpcResponse> Resolved;
   1909     typedef kj::Exception Broken;
   1910     kj::OneOf<Waiting, Resolved, Broken> state;
   1911 
   1912     kj::HashMap<kj::Array<PipelineOp>, kj::Own<ClientHook>> clientMap;
   1913     // See QueuedPipeline::clientMap in capability.c++ for a discussion of why we must memoize
   1914     // the results of getPipelinedCap(). RpcPipeline has a similar problem when a capability we
   1915     // return is later subject to an embargo. It's important that the embargo is correctly applied
   1916     // across all calls to the same capability.
   1917 
   1918     // Keep this last, because the continuation uses *this, so it should be destroyed first to
   1919     // ensure the continuation is not still running.
   1920     kj::Promise<void> resolveSelfPromise;
   1921 
   1922     void resolve(kj::Own<RpcResponse>&& response) {
   1923       KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
   1924       state.init<Resolved>(kj::mv(response));
   1925     }
   1926 
   1927     void resolve(const kj::Exception&& exception) {
   1928       KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
   1929       state.init<Broken>(kj::mv(exception));
   1930     }
   1931   };
   1932 
   1933   class RpcResponse: public ResponseHook {
   1934   public:
   1935     virtual AnyPointer::Reader getResults() = 0;
   1936     virtual kj::Own<RpcResponse> addRef() = 0;
   1937   };
   1938 
   1939   class RpcResponseImpl final: public RpcResponse, public kj::Refcounted {
   1940   public:
   1941     RpcResponseImpl(RpcConnectionState& connectionState,
   1942                     kj::Own<QuestionRef>&& questionRef,
   1943                     kj::Own<IncomingRpcMessage>&& message,
   1944                     kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTableArray,
   1945                     AnyPointer::Reader results)
   1946         : connectionState(kj::addRef(connectionState)),
   1947           message(kj::mv(message)),
   1948           capTable(kj::mv(capTableArray)),
   1949           reader(capTable.imbue(results)),
   1950           questionRef(kj::mv(questionRef)) {}
   1951 
   1952     AnyPointer::Reader getResults() override {
   1953       return reader;
   1954     }
   1955 
   1956     kj::Own<RpcResponse> addRef() override {
   1957       return kj::addRef(*this);
   1958     }
   1959 
   1960   private:
   1961     kj::Own<RpcConnectionState> connectionState;
   1962     kj::Own<IncomingRpcMessage> message;
   1963     ReaderCapabilityTable capTable;
   1964     AnyPointer::Reader reader;
   1965     kj::Own<QuestionRef> questionRef;
   1966   };
   1967 
   1968   // =====================================================================================
   1969   // CallContextHook implementation
   1970 
   1971   class RpcServerResponse {
   1972   public:
   1973     virtual AnyPointer::Builder getResultsBuilder() = 0;
   1974   };
   1975 
   1976   class RpcServerResponseImpl final: public RpcServerResponse {
   1977   public:
   1978     RpcServerResponseImpl(RpcConnectionState& connectionState,
   1979                           kj::Own<OutgoingRpcMessage>&& message,
   1980                           rpc::Payload::Builder payload)
   1981         : connectionState(connectionState),
   1982           message(kj::mv(message)),
   1983           payload(payload) {}
   1984 
   1985     AnyPointer::Builder getResultsBuilder() override {
   1986       return capTable.imbue(payload.getContent());
   1987     }
   1988 
   1989     kj::Maybe<kj::Array<ExportId>> send() {
   1990       // Send the response and return the export list.  Returns nullptr if there were no caps.
   1991       // (Could return a non-null empty array if there were caps but none of them were exports.)
   1992 
   1993       // Build the cap table.
   1994       auto capTable = this->capTable.getTable();
   1995       kj::Vector<int> fds;
   1996       auto exports = connectionState.writeDescriptors(capTable, payload, fds);
   1997       message->setFds(fds.releaseAsArray());
   1998 
   1999       // Capabilities that we are returning are subject to embargos. See `Disembargo` in rpc.capnp.
   2000       // As explained there, in order to deal with the Tribble 4-way race condition, we need to
   2001       // make sure that if we're returning any remote promises, that we ignore any subsequent
   2002       // resolution of those promises for the purpose of pipelined requests on this answer. Luckily,
   2003       // we can modify the cap table in-place.
   2004       for (auto& slot: capTable) {
   2005         KJ_IF_MAYBE(cap, slot) {
   2006           slot = connectionState.getInnermostClient(**cap);
   2007         }
   2008       }
   2009 
   2010       message->send();
   2011       if (capTable.size() == 0) {
   2012         return nullptr;
   2013       } else {
   2014         return kj::mv(exports);
   2015       }
   2016     }
   2017 
   2018   private:
   2019     RpcConnectionState& connectionState;
   2020     kj::Own<OutgoingRpcMessage> message;
   2021     BuilderCapabilityTable capTable;
   2022     rpc::Payload::Builder payload;
   2023   };
   2024 
   2025   class LocallyRedirectedRpcResponse final
   2026       : public RpcResponse, public RpcServerResponse, public kj::Refcounted{
   2027   public:
   2028     LocallyRedirectedRpcResponse(kj::Maybe<MessageSize> sizeHint)
   2029         : message(sizeHint.map([](MessageSize size) { return size.wordCount; })
   2030                           .orDefault(SUGGESTED_FIRST_SEGMENT_WORDS)) {}
   2031 
   2032     AnyPointer::Builder getResultsBuilder() override {
   2033       return message.getRoot<AnyPointer>();
   2034     }
   2035 
   2036     AnyPointer::Reader getResults() override {
   2037       return message.getRoot<AnyPointer>();
   2038     }
   2039 
   2040     kj::Own<RpcResponse> addRef() override {
   2041       return kj::addRef(*this);
   2042     }
   2043 
   2044   private:
   2045     MallocMessageBuilder message;
   2046   };
   2047 
   2048   class RpcCallContext final: public CallContextHook, public kj::Refcounted {
   2049   public:
   2050     RpcCallContext(RpcConnectionState& connectionState, AnswerId answerId,
   2051                    kj::Own<IncomingRpcMessage>&& request,
   2052                    kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTableArray,
   2053                    const AnyPointer::Reader& params,
   2054                    bool redirectResults, kj::Own<kj::PromiseFulfiller<void>>&& cancelFulfiller,
   2055                    uint64_t interfaceId, uint16_t methodId)
   2056         : connectionState(kj::addRef(connectionState)),
   2057           answerId(answerId),
   2058           interfaceId(interfaceId),
   2059           methodId(methodId),
   2060           requestSize(request->sizeInWords()),
   2061           request(kj::mv(request)),
   2062           paramsCapTable(kj::mv(capTableArray)),
   2063           params(paramsCapTable.imbue(params)),
   2064           returnMessage(nullptr),
   2065           redirectResults(redirectResults),
   2066           cancelFulfiller(kj::mv(cancelFulfiller)) {
   2067       connectionState.callWordsInFlight += requestSize;
   2068     }
   2069 
   2070     ~RpcCallContext() noexcept(false) {
   2071       if (isFirstResponder()) {
   2072         // We haven't sent a return yet, so we must have been canceled.  Send a cancellation return.
   2073         unwindDetector.catchExceptionsIfUnwinding([&]() {
   2074           // Don't send anything if the connection is broken.
   2075           bool shouldFreePipeline = true;
   2076           if (connectionState->connection.is<Connected>()) {
   2077             auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
   2078                 messageSizeHint<rpc::Return>() + sizeInWords<rpc::Payload>());
   2079             auto builder = message->getBody().initAs<rpc::Message>().initReturn();
   2080 
   2081             builder.setAnswerId(answerId);
   2082             builder.setReleaseParamCaps(false);
   2083 
   2084             if (redirectResults) {
   2085               // The reason we haven't sent a return is because the results were sent somewhere
   2086               // else.
   2087               builder.setResultsSentElsewhere();
   2088 
   2089               // The pipeline could still be valid and in-use in this case.
   2090               shouldFreePipeline = false;
   2091             } else {
   2092               builder.setCanceled();
   2093             }
   2094 
   2095             message->send();
   2096           }
   2097 
   2098           cleanupAnswerTable(nullptr, shouldFreePipeline);
   2099         });
   2100       }
   2101     }
   2102 
   2103     kj::Own<RpcResponse> consumeRedirectedResponse() {
   2104       KJ_ASSERT(redirectResults);
   2105 
   2106       if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
   2107 
   2108       // Note that the context needs to keep its own reference to the response so that it doesn't
   2109       // get GC'd until the PipelineHook drops its reference to the context.
   2110       return kj::downcast<LocallyRedirectedRpcResponse>(*KJ_ASSERT_NONNULL(response)).addRef();
   2111     }
   2112 
   2113     void sendReturn() {
   2114       KJ_ASSERT(!redirectResults);
   2115 
   2116       // Avoid sending results if canceled so that we don't have to figure out whether or not
   2117       // `releaseResultCaps` was set in the already-received `Finish`.
   2118       if (!(cancellationFlags & CANCEL_REQUESTED) && isFirstResponder()) {
   2119         KJ_ASSERT(connectionState->connection.is<Connected>(),
   2120                   "Cancellation should have been requested on disconnect.") {
   2121           return;
   2122         }
   2123 
   2124         if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
   2125 
   2126         returnMessage.setAnswerId(answerId);
   2127         returnMessage.setReleaseParamCaps(false);
   2128 
   2129         kj::Maybe<kj::Array<ExportId>> exports;
   2130         KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
   2131           // Debug info incase send() fails due to overside message.
   2132           KJ_CONTEXT("returning from RPC call", interfaceId, methodId);
   2133           exports = kj::downcast<RpcServerResponseImpl>(*KJ_ASSERT_NONNULL(response)).send();
   2134         })) {
   2135           responseSent = false;
   2136           sendErrorReturn(kj::mv(*exception));
   2137           return;
   2138         }
   2139 
   2140         KJ_IF_MAYBE(e, exports) {
   2141           // Caps were returned, so we can't free the pipeline yet.
   2142           cleanupAnswerTable(kj::mv(*e), false);
   2143         } else {
   2144           // No caps in the results, therefore the pipeline is irrelevant.
   2145           cleanupAnswerTable(nullptr, true);
   2146         }
   2147       }
   2148     }
   2149     void sendErrorReturn(kj::Exception&& exception) {
   2150       KJ_ASSERT(!redirectResults);
   2151       if (isFirstResponder()) {
   2152         if (connectionState->connection.is<Connected>()) {
   2153           auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
   2154               messageSizeHint<rpc::Return>() + exceptionSizeHint(exception));
   2155           auto builder = message->getBody().initAs<rpc::Message>().initReturn();
   2156 
   2157           builder.setAnswerId(answerId);
   2158           builder.setReleaseParamCaps(false);
   2159           connectionState->fromException(exception, builder.initException());
   2160 
   2161           message->send();
   2162         }
   2163 
   2164         // Do not allow releasing the pipeline because we want pipelined calls to propagate the
   2165         // exception rather than fail with a "no such field" exception.
   2166         cleanupAnswerTable(nullptr, false);
   2167       }
   2168     }
   2169     void sendRedirectReturn() {
   2170       KJ_ASSERT(redirectResults);
   2171 
   2172       if (isFirstResponder()) {
   2173         auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
   2174             messageSizeHint<rpc::Return>());
   2175         auto builder = message->getBody().initAs<rpc::Message>().initReturn();
   2176 
   2177         builder.setAnswerId(answerId);
   2178         builder.setReleaseParamCaps(false);
   2179         builder.setResultsSentElsewhere();
   2180 
   2181         message->send();
   2182 
   2183         cleanupAnswerTable(nullptr, false);
   2184       }
   2185     }
   2186 
   2187     void requestCancel() {
   2188       // Hints that the caller wishes to cancel this call.  At the next time when cancellation is
   2189       // deemed safe, the RpcCallContext shall send a canceled Return -- or if it never becomes
   2190       // safe, the RpcCallContext will send a normal return when the call completes.  Either way
   2191       // the RpcCallContext is now responsible for cleaning up the entry in the answer table, since
   2192       // a Finish message was already received.
   2193 
   2194       bool previouslyAllowedButNotRequested = cancellationFlags == CANCEL_ALLOWED;
   2195       cancellationFlags |= CANCEL_REQUESTED;
   2196 
   2197       if (previouslyAllowedButNotRequested) {
   2198         // We just set CANCEL_REQUESTED, and CANCEL_ALLOWED was already set previously.  Initiate
   2199         // the cancellation.
   2200         cancelFulfiller->fulfill();
   2201       }
   2202     }
   2203 
   2204     // implements CallContextHook ------------------------------------
   2205 
   2206     AnyPointer::Reader getParams() override {
   2207       KJ_REQUIRE(request != nullptr, "Can't call getParams() after releaseParams().");
   2208       return params;
   2209     }
   2210     void releaseParams() override {
   2211       request = nullptr;
   2212     }
   2213     AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override {
   2214       KJ_IF_MAYBE(r, response) {
   2215         return r->get()->getResultsBuilder();
   2216       } else {
   2217         kj::Own<RpcServerResponse> response;
   2218 
   2219         if (redirectResults || !connectionState->connection.is<Connected>()) {
   2220           response = kj::refcounted<LocallyRedirectedRpcResponse>(sizeHint);
   2221         } else {
   2222           auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
   2223               firstSegmentSize(sizeHint, messageSizeHint<rpc::Return>() +
   2224                                sizeInWords<rpc::Payload>()));
   2225           returnMessage = message->getBody().initAs<rpc::Message>().initReturn();
   2226           response = kj::heap<RpcServerResponseImpl>(
   2227               *connectionState, kj::mv(message), returnMessage.getResults());
   2228         }
   2229 
   2230         auto results = response->getResultsBuilder();
   2231         this->response = kj::mv(response);
   2232         return results;
   2233       }
   2234     }
   2235     void setPipeline(kj::Own<PipelineHook>&& pipeline) override {
   2236       KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
   2237         f->get()->fulfill(AnyPointer::Pipeline(kj::mv(pipeline)));
   2238       }
   2239     }
   2240     kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
   2241       auto result = directTailCall(kj::mv(request));
   2242       KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
   2243         f->get()->fulfill(AnyPointer::Pipeline(kj::mv(result.pipeline)));
   2244       }
   2245       return kj::mv(result.promise);
   2246     }
   2247     ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
   2248       KJ_REQUIRE(response == nullptr,
   2249                  "Can't call tailCall() after initializing the results struct.");
   2250 
   2251       if (request->getBrand() == connectionState.get() && !redirectResults) {
   2252         // The tail call is headed towards the peer that called us in the first place, so we can
   2253         // optimize out the return trip.
   2254 
   2255         KJ_IF_MAYBE(tailInfo, kj::downcast<RpcRequest>(*request).tailSend()) {
   2256           if (isFirstResponder()) {
   2257             if (connectionState->connection.is<Connected>()) {
   2258               auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
   2259                   messageSizeHint<rpc::Return>());
   2260               auto builder = message->getBody().initAs<rpc::Message>().initReturn();
   2261 
   2262               builder.setAnswerId(answerId);
   2263               builder.setReleaseParamCaps(false);
   2264               builder.setTakeFromOtherQuestion(tailInfo->questionId);
   2265 
   2266               message->send();
   2267             }
   2268 
   2269             // There are no caps in our return message, but of course the tail results could have
   2270             // caps, so we must continue to honor pipeline calls (and just bounce them back).
   2271             cleanupAnswerTable(nullptr, false);
   2272           }
   2273           return { kj::mv(tailInfo->promise), kj::mv(tailInfo->pipeline) };
   2274         }
   2275       }
   2276 
   2277       // Just forwarding to another local call.
   2278       auto promise = request->send();
   2279 
   2280       // Wait for response.
   2281       auto voidPromise = promise.then([this](Response<AnyPointer>&& tailResponse) {
   2282         // Copy the response.
   2283         // TODO(perf):  It would be nice if we could somehow make the response get built in-place
   2284         //   but requires some refactoring.
   2285         getResults(tailResponse.targetSize()).set(tailResponse);
   2286       });
   2287 
   2288       return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
   2289     }
   2290     kj::Promise<AnyPointer::Pipeline> onTailCall() override {
   2291       auto paf = kj::newPromiseAndFulfiller<AnyPointer::Pipeline>();
   2292       tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
   2293       return kj::mv(paf.promise);
   2294     }
   2295     void allowCancellation() override {
   2296       bool previouslyRequestedButNotAllowed = cancellationFlags == CANCEL_REQUESTED;
   2297       cancellationFlags |= CANCEL_ALLOWED;
   2298 
   2299       if (previouslyRequestedButNotAllowed) {
   2300         // We just set CANCEL_ALLOWED, and CANCEL_REQUESTED was already set previously.  Initiate
   2301         // the cancellation.
   2302         cancelFulfiller->fulfill();
   2303       }
   2304     }
   2305     kj::Own<CallContextHook> addRef() override {
   2306       return kj::addRef(*this);
   2307     }
   2308 
   2309   private:
   2310     kj::Own<RpcConnectionState> connectionState;
   2311     AnswerId answerId;
   2312 
   2313     uint64_t interfaceId;
   2314     uint16_t methodId;
   2315     // For debugging.
   2316 
   2317     // Request ---------------------------------------------
   2318 
   2319     size_t requestSize;  // for flow limit purposes
   2320     kj::Maybe<kj::Own<IncomingRpcMessage>> request;
   2321     ReaderCapabilityTable paramsCapTable;
   2322     AnyPointer::Reader params;
   2323 
   2324     // Response --------------------------------------------
   2325 
   2326     kj::Maybe<kj::Own<RpcServerResponse>> response;
   2327     rpc::Return::Builder returnMessage;
   2328     bool redirectResults = false;
   2329     bool responseSent = false;
   2330     kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller;
   2331 
   2332     // Cancellation state ----------------------------------
   2333 
   2334     enum CancellationFlags {
   2335       CANCEL_REQUESTED = 1,
   2336       CANCEL_ALLOWED = 2
   2337     };
   2338 
   2339     uint8_t cancellationFlags = 0;
   2340     // When both flags are set, the cancellation process will begin.
   2341 
   2342     kj::Own<kj::PromiseFulfiller<void>> cancelFulfiller;
   2343     // Fulfilled when cancellation has been both requested and permitted.  The fulfilled promise is
   2344     // exclusive-joined with the outermost promise waiting on the call return, so fulfilling it
   2345     // cancels that promise.
   2346 
   2347     kj::UnwindDetector unwindDetector;
   2348 
   2349     // -----------------------------------------------------
   2350 
   2351     bool isFirstResponder() {
   2352       if (responseSent) {
   2353         return false;
   2354       } else {
   2355         responseSent = true;
   2356         return true;
   2357       }
   2358     }
   2359 
   2360     void cleanupAnswerTable(kj::Array<ExportId> resultExports, bool shouldFreePipeline) {
   2361       // We need to remove the `callContext` pointer -- which points back to us -- from the
   2362       // answer table.  Or we might even be responsible for removing the entire answer table
   2363       // entry.
   2364 
   2365       if (cancellationFlags & CANCEL_REQUESTED) {
   2366         // Already received `Finish` so it's our job to erase the table entry. We shouldn't have
   2367         // sent results if canceled, so we shouldn't have an export list to deal with.
   2368         KJ_ASSERT(resultExports.size() == 0);
   2369         connectionState->answers.erase(answerId);
   2370       } else {
   2371         // We just have to null out callContext and set the exports.
   2372         auto& answer = connectionState->answers[answerId];
   2373         answer.callContext = nullptr;
   2374         answer.resultExports = kj::mv(resultExports);
   2375 
   2376         if (shouldFreePipeline) {
   2377           // We can free the pipeline early, because we know all pipeline calls are invalid (e.g.
   2378           // because there are no caps in the result to receive pipeline requests).
   2379           KJ_ASSERT(resultExports.size() == 0);
   2380           answer.pipeline = nullptr;
   2381         }
   2382       }
   2383 
   2384       // Also, this is the right time to stop counting the call against the flow limit.
   2385       connectionState->callWordsInFlight -= requestSize;
   2386       connectionState->maybeUnblockFlow();
   2387     }
   2388   };
   2389 
   2390   // =====================================================================================
   2391   // Message handling
   2392 
   2393   void maybeUnblockFlow() {
   2394     if (callWordsInFlight < flowLimit) {
   2395       KJ_IF_MAYBE(w, flowWaiter) {
   2396         w->get()->fulfill();
   2397         flowWaiter = nullptr;
   2398       }
   2399     }
   2400   }
   2401 
   2402   kj::Promise<void> messageLoop() {
   2403     if (!connection.is<Connected>()) {
   2404       return kj::READY_NOW;
   2405     }
   2406 
   2407     if (callWordsInFlight > flowLimit) {
   2408       auto paf = kj::newPromiseAndFulfiller<void>();
   2409       flowWaiter = kj::mv(paf.fulfiller);
   2410       return paf.promise.then([this]() {
   2411         return messageLoop();
   2412       });
   2413     }
   2414 
   2415     return canceler.wrap(connection.get<Connected>()->receiveIncomingMessage()).then(
   2416         [this](kj::Maybe<kj::Own<IncomingRpcMessage>>&& message) {
   2417       KJ_IF_MAYBE(m, message) {
   2418         handleMessage(kj::mv(*m));
   2419         return true;
   2420       } else {
   2421         disconnect(KJ_EXCEPTION(DISCONNECTED, "Peer disconnected."));
   2422         return false;
   2423       }
   2424     }).then([this](bool keepGoing) {
   2425       // No exceptions; continue loop.
   2426       //
   2427       // (We do this in a separate continuation to handle the case where exceptions are
   2428       // disabled.)
   2429       //
   2430       // TODO(perf): We add an evalLater() here so that anything we needed to do in reaction to
   2431       //   the previous message has a chance to complete before the next message is handled. In
   2432       //   paticular, without this, I observed an ordering problem: I saw a case where a `Return`
   2433       //   message was followed by a `Resolve` message, but the `PromiseClient` associated with the
   2434       //   `Resolve` had its `resolve()` method invoked _before_ any `PromiseClient`s associated
   2435       //   with pipelined capabilities resolved by the `Return`. This could lead to an
   2436       //   incorrectly-ordered interaction between `PromiseClient`s when they resolve to each
   2437       //   other. This is probably really a bug in the way `Return`s are handled -- apparently,
   2438       //   resolution of `PromiseClient`s based on returned capabilites does not occur in a
   2439       //   depth-first way, when it should. If we could fix that then we can probably remove this
   2440       //   `evalLater()`. However, the `evalLater()` is not that bad and solves the problem...
   2441       if (keepGoing) tasks.add(kj::evalLater([this]() { return messageLoop(); }));
   2442     });
   2443   }
   2444 
   2445   void handleMessage(kj::Own<IncomingRpcMessage> message) {
   2446     auto reader = message->getBody().getAs<rpc::Message>();
   2447 
   2448     switch (reader.which()) {
   2449       case rpc::Message::UNIMPLEMENTED:
   2450         handleUnimplemented(reader.getUnimplemented());
   2451         break;
   2452 
   2453       case rpc::Message::ABORT:
   2454         handleAbort(reader.getAbort());
   2455         break;
   2456 
   2457       case rpc::Message::BOOTSTRAP:
   2458         handleBootstrap(kj::mv(message), reader.getBootstrap());
   2459         break;
   2460 
   2461       case rpc::Message::CALL:
   2462         handleCall(kj::mv(message), reader.getCall());
   2463         break;
   2464 
   2465       case rpc::Message::RETURN:
   2466         handleReturn(kj::mv(message), reader.getReturn());
   2467         break;
   2468 
   2469       case rpc::Message::FINISH:
   2470         handleFinish(reader.getFinish());
   2471         break;
   2472 
   2473       case rpc::Message::RESOLVE:
   2474         handleResolve(kj::mv(message), reader.getResolve());
   2475         break;
   2476 
   2477       case rpc::Message::RELEASE:
   2478         handleRelease(reader.getRelease());
   2479         break;
   2480 
   2481       case rpc::Message::DISEMBARGO:
   2482         handleDisembargo(reader.getDisembargo());
   2483         break;
   2484 
   2485       default: {
   2486         if (connection.is<Connected>()) {
   2487           auto message = connection.get<Connected>()->newOutgoingMessage(
   2488               firstSegmentSize(reader.totalSize(), messageSizeHint<void>()));
   2489           message->getBody().initAs<rpc::Message>().setUnimplemented(reader);
   2490           message->send();
   2491         }
   2492         break;
   2493       }
   2494     }
   2495   }
   2496 
   2497   void handleUnimplemented(const rpc::Message::Reader& message) {
   2498     switch (message.which()) {
   2499       case rpc::Message::RESOLVE: {
   2500         auto resolve = message.getResolve();
   2501         switch (resolve.which()) {
   2502           case rpc::Resolve::CAP: {
   2503             auto cap = resolve.getCap();
   2504             switch (cap.which()) {
   2505               case rpc::CapDescriptor::NONE:
   2506                 // Nothing to do (but this ought never to happen).
   2507                 break;
   2508               case rpc::CapDescriptor::SENDER_HOSTED:
   2509                 releaseExport(cap.getSenderHosted(), 1);
   2510                 break;
   2511               case rpc::CapDescriptor::SENDER_PROMISE:
   2512                 releaseExport(cap.getSenderPromise(), 1);
   2513                 break;
   2514               case rpc::CapDescriptor::RECEIVER_ANSWER:
   2515               case rpc::CapDescriptor::RECEIVER_HOSTED:
   2516                 // Nothing to do.
   2517                 break;
   2518               case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
   2519                 releaseExport(cap.getThirdPartyHosted().getVineId(), 1);
   2520                 break;
   2521             }
   2522             break;
   2523           }
   2524           case rpc::Resolve::EXCEPTION:
   2525             // Nothing to do.
   2526             break;
   2527         }
   2528         break;
   2529       }
   2530 
   2531       default:
   2532         KJ_FAIL_ASSERT("Peer did not implement required RPC message type.", (uint)message.which());
   2533         break;
   2534     }
   2535   }
   2536 
   2537   void handleAbort(const rpc::Exception::Reader& exception) {
   2538     kj::throwRecoverableException(toException(exception));
   2539   }
   2540 
   2541   // ---------------------------------------------------------------------------
   2542   // Level 0
   2543 
   2544   class SingleCapPipeline: public PipelineHook, public kj::Refcounted {
   2545   public:
   2546     SingleCapPipeline(kj::Own<ClientHook>&& cap)
   2547         : cap(kj::mv(cap)) {}
   2548 
   2549     kj::Own<PipelineHook> addRef() override {
   2550       return kj::addRef(*this);
   2551     }
   2552 
   2553     kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
   2554       if (ops.size() == 0) {
   2555         return cap->addRef();
   2556       } else {
   2557         return newBrokenCap("Invalid pipeline transform.");
   2558       }
   2559     }
   2560 
   2561   private:
   2562     kj::Own<ClientHook> cap;
   2563   };
   2564 
   2565   void handleBootstrap(kj::Own<IncomingRpcMessage>&& message,
   2566                        const rpc::Bootstrap::Reader& bootstrap) {
   2567     AnswerId answerId = bootstrap.getQuestionId();
   2568 
   2569     if (!connection.is<Connected>()) {
   2570       // Disconnected; ignore.
   2571       return;
   2572     }
   2573 
   2574     VatNetworkBase::Connection& conn = *connection.get<Connected>();
   2575     auto response = conn.newOutgoingMessage(
   2576         messageSizeHint<rpc::Return>() + sizeInWords<rpc::CapDescriptor>() + 32);
   2577 
   2578     rpc::Return::Builder ret = response->getBody().getAs<rpc::Message>().initReturn();
   2579     ret.setAnswerId(answerId);
   2580 
   2581     kj::Own<ClientHook> capHook;
   2582     kj::Array<ExportId> resultExports;
   2583     KJ_DEFER(releaseExports(resultExports));  // in case something goes wrong
   2584 
   2585     // Call the restorer and initialize the answer.
   2586     KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
   2587       Capability::Client cap = nullptr;
   2588 
   2589       if (bootstrap.hasDeprecatedObjectId()) {
   2590         KJ_IF_MAYBE(r, restorer) {
   2591           cap = r->baseRestore(bootstrap.getDeprecatedObjectId());
   2592         } else {
   2593           KJ_FAIL_REQUIRE("This vat only supports a bootstrap interface, not the old "
   2594                           "Cap'n-Proto-0.4-style named exports.") { return; }
   2595         }
   2596       } else {
   2597         cap = bootstrapFactory.baseCreateFor(conn.baseGetPeerVatId());
   2598       }
   2599 
   2600       BuilderCapabilityTable capTable;
   2601       auto payload = ret.initResults();
   2602       capTable.imbue(payload.getContent()).setAs<Capability>(kj::mv(cap));
   2603 
   2604       auto capTableArray = capTable.getTable();
   2605       KJ_DASSERT(capTableArray.size() == 1);
   2606       kj::Vector<int> fds;
   2607       resultExports = writeDescriptors(capTableArray, payload, fds);
   2608       response->setFds(fds.releaseAsArray());
   2609       capHook = KJ_ASSERT_NONNULL(capTableArray[0])->addRef();
   2610     })) {
   2611       fromException(*exception, ret.initException());
   2612       capHook = newBrokenCap(kj::mv(*exception));
   2613     }
   2614 
   2615     message = nullptr;
   2616 
   2617     // Add the answer to the answer table for pipelining and send the response.
   2618     auto& answer = answers[answerId];
   2619     KJ_REQUIRE(!answer.active, "questionId is already in use", answerId) {
   2620       return;
   2621     }
   2622 
   2623     answer.resultExports = kj::mv(resultExports);
   2624     answer.active = true;
   2625     answer.pipeline = kj::Own<PipelineHook>(kj::refcounted<SingleCapPipeline>(kj::mv(capHook)));
   2626 
   2627     response->send();
   2628   }
   2629 
   2630   void handleCall(kj::Own<IncomingRpcMessage>&& message, const rpc::Call::Reader& call) {
   2631     kj::Own<ClientHook> capability;
   2632 
   2633     KJ_IF_MAYBE(t, getMessageTarget(call.getTarget())) {
   2634       capability = kj::mv(*t);
   2635     } else {
   2636       // Exception already reported.
   2637       return;
   2638     }
   2639 
   2640     bool redirectResults;
   2641     switch (call.getSendResultsTo().which()) {
   2642       case rpc::Call::SendResultsTo::CALLER:
   2643         redirectResults = false;
   2644         break;
   2645       case rpc::Call::SendResultsTo::YOURSELF:
   2646         redirectResults = true;
   2647         break;
   2648       default:
   2649         KJ_FAIL_REQUIRE("Unsupported `Call.sendResultsTo`.") { return; }
   2650     }
   2651 
   2652     auto payload = call.getParams();
   2653     auto capTableArray = receiveCaps(payload.getCapTable(), message->getAttachedFds());
   2654     auto cancelPaf = kj::newPromiseAndFulfiller<void>();
   2655 
   2656     AnswerId answerId = call.getQuestionId();
   2657 
   2658     auto context = kj::refcounted<RpcCallContext>(
   2659         *this, answerId, kj::mv(message), kj::mv(capTableArray), payload.getContent(),
   2660         redirectResults, kj::mv(cancelPaf.fulfiller),
   2661         call.getInterfaceId(), call.getMethodId());
   2662 
   2663     // No more using `call` after this point, as it now belongs to the context.
   2664 
   2665     {
   2666       auto& answer = answers[answerId];
   2667 
   2668       KJ_REQUIRE(!answer.active, "questionId is already in use") {
   2669         return;
   2670       }
   2671 
   2672       answer.active = true;
   2673       answer.callContext = *context;
   2674     }
   2675 
   2676     auto promiseAndPipeline = startCall(
   2677         call.getInterfaceId(), call.getMethodId(), kj::mv(capability), context->addRef());
   2678 
   2679     // Things may have changed -- in particular if startCall() immediately called
   2680     // context->directTailCall().
   2681 
   2682     {
   2683       auto& answer = answers[answerId];
   2684 
   2685       answer.pipeline = kj::mv(promiseAndPipeline.pipeline);
   2686 
   2687       if (redirectResults) {
   2688         auto resultsPromise = promiseAndPipeline.promise.then(
   2689             kj::mvCapture(context, [](kj::Own<RpcCallContext>&& context) {
   2690               return context->consumeRedirectedResponse();
   2691             }));
   2692 
   2693         // If the call that later picks up `redirectedResults` decides to discard it, we need to
   2694         // make sure our call is not itself canceled unless it has called allowCancellation().
   2695         // So we fork the promise and join one branch with the cancellation promise, in order to
   2696         // hold on to it.
   2697         auto forked = resultsPromise.fork();
   2698         answer.redirectedResults = forked.addBranch();
   2699 
   2700         cancelPaf.promise
   2701             .exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){}))
   2702             .detach([](kj::Exception&&) {});
   2703       } else {
   2704         // Hack:  Both the success and error continuations need to use the context.  We could
   2705         //   refcount, but both will be destroyed at the same time anyway.
   2706         RpcCallContext* contextPtr = context;
   2707 
   2708         promiseAndPipeline.promise.then(
   2709             [contextPtr]() {
   2710               contextPtr->sendReturn();
   2711             }, [contextPtr](kj::Exception&& exception) {
   2712               contextPtr->sendErrorReturn(kj::mv(exception));
   2713             }).catch_([&](kj::Exception&& exception) {
   2714               // Handle exceptions that occur in sendReturn()/sendErrorReturn().
   2715               taskFailed(kj::mv(exception));
   2716             }).attach(kj::mv(context))
   2717             .exclusiveJoin(kj::mv(cancelPaf.promise))
   2718             .detach([](kj::Exception&&) {});
   2719       }
   2720     }
   2721   }
   2722 
   2723   ClientHook::VoidPromiseAndPipeline startCall(
   2724       uint64_t interfaceId, uint64_t methodId,
   2725       kj::Own<ClientHook>&& capability, kj::Own<CallContextHook>&& context) {
   2726     return capability->call(interfaceId, methodId, kj::mv(context));
   2727   }
   2728 
   2729   kj::Maybe<kj::Own<ClientHook>> getMessageTarget(const rpc::MessageTarget::Reader& target) {
   2730     switch (target.which()) {
   2731       case rpc::MessageTarget::IMPORTED_CAP: {
   2732         KJ_IF_MAYBE(exp, exports.find(target.getImportedCap())) {
   2733           return exp->clientHook->addRef();
   2734         } else {
   2735           KJ_FAIL_REQUIRE("Message target is not a current export ID.") {
   2736             return nullptr;
   2737           }
   2738         }
   2739         break;
   2740       }
   2741 
   2742       case rpc::MessageTarget::PROMISED_ANSWER: {
   2743         auto promisedAnswer = target.getPromisedAnswer();
   2744         kj::Own<PipelineHook> pipeline;
   2745 
   2746         auto& base = answers[promisedAnswer.getQuestionId()];
   2747         KJ_REQUIRE(base.active, "PromisedAnswer.questionId is not a current question.") {
   2748           return nullptr;
   2749         }
   2750         KJ_IF_MAYBE(p, base.pipeline) {
   2751           pipeline = p->get()->addRef();
   2752         } else {
   2753           pipeline = newBrokenPipeline(KJ_EXCEPTION(FAILED,
   2754               "Pipeline call on a request that returned no capabilities or was already closed."));
   2755         }
   2756 
   2757         KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
   2758           return pipeline->getPipelinedCap(*ops);
   2759         } else {
   2760           // Exception already thrown.
   2761           return nullptr;
   2762         }
   2763       }
   2764 
   2765       default:
   2766         KJ_FAIL_REQUIRE("Unknown message target type.", target) {
   2767           return nullptr;
   2768         }
   2769     }
   2770 
   2771     KJ_UNREACHABLE;
   2772   }
   2773 
   2774   void handleReturn(kj::Own<IncomingRpcMessage>&& message, const rpc::Return::Reader& ret) {
   2775     // Transitive destructors can end up manipulating the question table and invalidating our
   2776     // pointer into it, so make sure these destructors run later.
   2777     kj::Array<ExportId> exportsToRelease;
   2778     KJ_DEFER(releaseExports(exportsToRelease));
   2779     kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> promiseToRelease;
   2780 
   2781     KJ_IF_MAYBE(question, questions.find(ret.getAnswerId())) {
   2782       KJ_REQUIRE(question->isAwaitingReturn, "Duplicate Return.") { return; }
   2783       question->isAwaitingReturn = false;
   2784 
   2785       if (ret.getReleaseParamCaps()) {
   2786         exportsToRelease = kj::mv(question->paramExports);
   2787       } else {
   2788         question->paramExports = nullptr;
   2789       }
   2790 
   2791       KJ_IF_MAYBE(questionRef, question->selfRef) {
   2792         switch (ret.which()) {
   2793           case rpc::Return::RESULTS: {
   2794             KJ_REQUIRE(!question->isTailCall,
   2795                 "Tail call `Return` must set `resultsSentElsewhere`, not `results`.") {
   2796               return;
   2797             }
   2798 
   2799             auto payload = ret.getResults();
   2800             auto capTableArray = receiveCaps(payload.getCapTable(), message->getAttachedFds());
   2801             questionRef->fulfill(kj::refcounted<RpcResponseImpl>(
   2802                 *this, kj::addRef(*questionRef), kj::mv(message),
   2803                 kj::mv(capTableArray), payload.getContent()));
   2804             break;
   2805           }
   2806 
   2807           case rpc::Return::EXCEPTION:
   2808             KJ_REQUIRE(!question->isTailCall,
   2809                 "Tail call `Return` must set `resultsSentElsewhere`, not `exception`.") {
   2810               return;
   2811             }
   2812 
   2813             questionRef->reject(toException(ret.getException()));
   2814             break;
   2815 
   2816           case rpc::Return::CANCELED:
   2817             KJ_FAIL_REQUIRE("Return message falsely claims call was canceled.") { return; }
   2818             break;
   2819 
   2820           case rpc::Return::RESULTS_SENT_ELSEWHERE:
   2821             KJ_REQUIRE(question->isTailCall,
   2822                 "`Return` had `resultsSentElsewhere` but this was not a tail call.") {
   2823               return;
   2824             }
   2825 
   2826             // Tail calls are fulfilled with a null pointer.
   2827             questionRef->fulfill(kj::Own<RpcResponse>());
   2828             break;
   2829 
   2830           case rpc::Return::TAKE_FROM_OTHER_QUESTION:
   2831             KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
   2832               KJ_IF_MAYBE(response, answer->redirectedResults) {
   2833                 questionRef->fulfill(kj::mv(*response));
   2834                 answer->redirectedResults = nullptr;
   2835 
   2836                 KJ_IF_MAYBE(context, answer->callContext) {
   2837                   // Send the `Return` message  for the call of which we're taking ownership, so
   2838                   // that the peer knows it can now tear down the call state.
   2839                   context->sendRedirectReturn();
   2840 
   2841                   // There are three conditions, all of which must be true, before a call is
   2842                   // canceled:
   2843                   // 1. The RPC opts in by calling context->allowCancellation().
   2844                   // 2. We request cancellation with context->requestCancel().
   2845                   // 3. The final response promise -- which we passed to questionRef->fulfill()
   2846                   //    above -- must be dropped.
   2847                   //
   2848                   // We would like #3 to imply #2. So... we can just make #2 be true.
   2849                   context->requestCancel();
   2850                 }
   2851               } else {
   2852                 KJ_FAIL_REQUIRE("`Return.takeFromOtherQuestion` referenced a call that did not "
   2853                                 "use `sendResultsTo.yourself`.") { return; }
   2854               }
   2855             } else {
   2856               KJ_FAIL_REQUIRE("`Return.takeFromOtherQuestion` had invalid answer ID.") { return; }
   2857             }
   2858 
   2859             break;
   2860 
   2861           default:
   2862             KJ_FAIL_REQUIRE("Unknown 'Return' type.") { return; }
   2863         }
   2864       } else {
   2865         // This is a response to a question that we canceled earlier.
   2866 
   2867         if (ret.isTakeFromOtherQuestion()) {
   2868           // This turned out to be a tail call back to us! We now take ownership of the tail call.
   2869           // Since the caller canceled, we need to cancel out the tail call, if it still exists.
   2870 
   2871           KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
   2872             // Indeed, it does still exist.
   2873 
   2874             // Throw away the result promise.
   2875             promiseToRelease = kj::mv(answer->redirectedResults);
   2876 
   2877             KJ_IF_MAYBE(context, answer->callContext) {
   2878               // Send the `Return` message  for the call of which we're taking ownership, so
   2879               // that the peer knows it can now tear down the call state.
   2880               context->sendRedirectReturn();
   2881 
   2882               // Since the caller has been canceled, make sure the callee that we're tailing to
   2883               // gets canceled.
   2884               context->requestCancel();
   2885             }
   2886           }
   2887         }
   2888 
   2889         // Looks like this question was canceled earlier, so `Finish` was already sent, with
   2890         // `releaseResultCaps` set true so that we don't have to release them here.  We can go
   2891         // ahead and delete it from the table.
   2892         questions.erase(ret.getAnswerId(), *question);
   2893       }
   2894 
   2895     } else {
   2896       KJ_FAIL_REQUIRE("Invalid question ID in Return message.") { return; }
   2897     }
   2898   }
   2899 
   2900   void handleFinish(const rpc::Finish::Reader& finish) {
   2901     // Delay release of these things until return so that transitive destructors don't accidentally
   2902     // modify the answer table and invalidate our pointer into it.
   2903     kj::Array<ExportId> exportsToRelease;
   2904     KJ_DEFER(releaseExports(exportsToRelease));
   2905     Answer answerToRelease;
   2906     kj::Maybe<kj::Own<PipelineHook>> pipelineToRelease;
   2907 
   2908     KJ_IF_MAYBE(answer, answers.find(finish.getQuestionId())) {
   2909       KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
   2910 
   2911       if (finish.getReleaseResultCaps()) {
   2912         exportsToRelease = kj::mv(answer->resultExports);
   2913       } else {
   2914         answer->resultExports = nullptr;
   2915       }
   2916 
   2917       pipelineToRelease = kj::mv(answer->pipeline);
   2918 
   2919       // If the call isn't actually done yet, cancel it.  Otherwise, we can go ahead and erase the
   2920       // question from the table.
   2921       KJ_IF_MAYBE(context, answer->callContext) {
   2922         context->requestCancel();
   2923       } else {
   2924         answerToRelease = answers.erase(finish.getQuestionId());
   2925       }
   2926     } else {
   2927       KJ_FAIL_REQUIRE("'Finish' for invalid question ID.") { return; }
   2928     }
   2929   }
   2930 
   2931   // ---------------------------------------------------------------------------
   2932   // Level 1
   2933 
   2934   void handleResolve(kj::Own<IncomingRpcMessage>&& message, const rpc::Resolve::Reader& resolve) {
   2935     kj::Own<ClientHook> replacement;
   2936     kj::Maybe<kj::Exception> exception;
   2937 
   2938     // Extract the replacement capability.
   2939     switch (resolve.which()) {
   2940       case rpc::Resolve::CAP:
   2941         KJ_IF_MAYBE(cap, receiveCap(resolve.getCap(), message->getAttachedFds())) {
   2942           replacement = kj::mv(*cap);
   2943         } else {
   2944           KJ_FAIL_REQUIRE("'Resolve' contained 'CapDescriptor.none'.") { return; }
   2945         }
   2946         break;
   2947 
   2948       case rpc::Resolve::EXCEPTION:
   2949         // We can't set `replacement` to a new broken cap here because this will confuse
   2950         // PromiseClient::Resolve() into thinking that the remote promise resolved to a local
   2951         // capability and therefore a Disembargo is needed. We must actually reject the promise.
   2952         exception = toException(resolve.getException());
   2953         break;
   2954 
   2955       default:
   2956         KJ_FAIL_REQUIRE("Unknown 'Resolve' type.") { return; }
   2957     }
   2958 
   2959     // If the import is on the table, fulfill it.
   2960     KJ_IF_MAYBE(import, imports.find(resolve.getPromiseId())) {
   2961       KJ_IF_MAYBE(fulfiller, import->promiseFulfiller) {
   2962         // OK, this is in fact an unfulfilled promise!
   2963         KJ_IF_MAYBE(e, exception) {
   2964           fulfiller->get()->reject(kj::mv(*e));
   2965         } else {
   2966           fulfiller->get()->fulfill(kj::mv(replacement));
   2967         }
   2968       } else if (import->importClient != nullptr) {
   2969         // It appears this is a valid entry on the import table, but was not expected to be a
   2970         // promise.
   2971         KJ_FAIL_REQUIRE("Got 'Resolve' for a non-promise import.") { break; }
   2972       }
   2973     }
   2974   }
   2975 
   2976   void handleRelease(const rpc::Release::Reader& release) {
   2977     releaseExport(release.getId(), release.getReferenceCount());
   2978   }
   2979 
   2980   void releaseExport(ExportId id, uint refcount) {
   2981     KJ_IF_MAYBE(exp, exports.find(id)) {
   2982       KJ_REQUIRE(refcount <= exp->refcount, "Tried to drop export's refcount below zero.") {
   2983         return;
   2984       }
   2985 
   2986       exp->refcount -= refcount;
   2987       if (exp->refcount == 0) {
   2988         exportsByCap.erase(exp->clientHook);
   2989         exports.erase(id, *exp);
   2990       }
   2991     } else {
   2992       KJ_FAIL_REQUIRE("Tried to release invalid export ID.") {
   2993         return;
   2994       }
   2995     }
   2996   }
   2997 
   2998   void releaseExports(kj::ArrayPtr<ExportId> exports) {
   2999     for (auto exportId: exports) {
   3000       releaseExport(exportId, 1);
   3001     }
   3002   }
   3003 
   3004   void handleDisembargo(const rpc::Disembargo::Reader& disembargo) {
   3005     auto context = disembargo.getContext();
   3006     switch (context.which()) {
   3007       case rpc::Disembargo::Context::SENDER_LOOPBACK: {
   3008         kj::Own<ClientHook> target;
   3009 
   3010         KJ_IF_MAYBE(t, getMessageTarget(disembargo.getTarget())) {
   3011           target = kj::mv(*t);
   3012         } else {
   3013           // Exception already reported.
   3014           return;
   3015         }
   3016 
   3017         for (;;) {
   3018           KJ_IF_MAYBE(r, target->getResolved()) {
   3019             target = r->addRef();
   3020           } else {
   3021             break;
   3022           }
   3023         }
   3024 
   3025         KJ_REQUIRE(target->getBrand() == this,
   3026                    "'Disembargo' of type 'senderLoopback' sent to an object that does not point "
   3027                    "back to the sender.") {
   3028           return;
   3029         }
   3030 
   3031         EmbargoId embargoId = context.getSenderLoopback();
   3032 
   3033         // We need to insert an evalLast() here to make sure that any pending calls towards this
   3034         // cap have had time to find their way through the event loop.
   3035         tasks.add(canceler.wrap(kj::evalLast(kj::mvCapture(
   3036             target, [this,embargoId](kj::Own<ClientHook>&& target) {
   3037           if (!connection.is<Connected>()) {
   3038             return;
   3039           }
   3040 
   3041           RpcClient& downcasted = kj::downcast<RpcClient>(*target);
   3042 
   3043           auto message = connection.get<Connected>()->newOutgoingMessage(
   3044               messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
   3045           auto builder = message->getBody().initAs<rpc::Message>().initDisembargo();
   3046 
   3047           {
   3048             auto redirect = downcasted.writeTarget(builder.initTarget());
   3049 
   3050             // Disembargoes should only be sent to capabilities that were previously the subject of
   3051             // a `Resolve` message.  But `writeTarget` only ever returns non-null when called on
   3052             // a PromiseClient.  The code which sends `Resolve` and `Return` should have replaced
   3053             // any promise with a direct node in order to solve the Tribble 4-way race condition.
   3054             // See the documentation of Disembargo in rpc.capnp for more.
   3055             KJ_REQUIRE(redirect == nullptr,
   3056                        "'Disembargo' of type 'senderLoopback' sent to an object that does not "
   3057                        "appear to have been the subject of a previous 'Resolve' message.") {
   3058               return;
   3059             }
   3060           }
   3061 
   3062           builder.getContext().setReceiverLoopback(embargoId);
   3063 
   3064           message->send();
   3065         }))));
   3066 
   3067         break;
   3068       }
   3069 
   3070       case rpc::Disembargo::Context::RECEIVER_LOOPBACK: {
   3071         KJ_IF_MAYBE(embargo, embargoes.find(context.getReceiverLoopback())) {
   3072           KJ_ASSERT_NONNULL(embargo->fulfiller)->fulfill();
   3073           embargoes.erase(context.getReceiverLoopback(), *embargo);
   3074         } else {
   3075           KJ_FAIL_REQUIRE("Invalid embargo ID in 'Disembargo.context.receiverLoopback'.") {
   3076             return;
   3077           }
   3078         }
   3079         break;
   3080       }
   3081 
   3082       default:
   3083         KJ_FAIL_REQUIRE("Unimplemented Disembargo type.", disembargo) { return; }
   3084     }
   3085   }
   3086 
   3087   // ---------------------------------------------------------------------------
   3088   // Level 2
   3089 };
   3090 
   3091 }  // namespace
   3092 
   3093 class RpcSystemBase::Impl final: private BootstrapFactoryBase, private kj::TaskSet::ErrorHandler {
   3094 public:
   3095   Impl(VatNetworkBase& network, kj::Maybe<Capability::Client> bootstrapInterface)
   3096       : network(network), bootstrapInterface(kj::mv(bootstrapInterface)),
   3097         bootstrapFactory(*this), tasks(*this) {
   3098     acceptLoopPromise = acceptLoop().eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
   3099   }
   3100   Impl(VatNetworkBase& network, BootstrapFactoryBase& bootstrapFactory)
   3101       : network(network), bootstrapFactory(bootstrapFactory), tasks(*this) {
   3102     acceptLoopPromise = acceptLoop().eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
   3103   }
   3104   Impl(VatNetworkBase& network, SturdyRefRestorerBase& restorer)
   3105       : network(network), bootstrapFactory(*this), restorer(restorer), tasks(*this) {
   3106     acceptLoopPromise = acceptLoop().eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
   3107   }
   3108 
   3109   ~Impl() noexcept(false) {
   3110     unwindDetector.catchExceptionsIfUnwinding([&]() {
   3111       // std::unordered_map doesn't like it when elements' destructors throw, so carefully
   3112       // disassemble it.
   3113       if (!connections.empty()) {
   3114         kj::Vector<kj::Own<RpcConnectionState>> deleteMe(connections.size());
   3115         kj::Exception shutdownException = KJ_EXCEPTION(DISCONNECTED, "RpcSystem was destroyed.");
   3116         for (auto& entry: connections) {
   3117           entry.second->disconnect(kj::cp(shutdownException));
   3118           deleteMe.add(kj::mv(entry.second));
   3119         }
   3120       }
   3121     });
   3122   }
   3123 
   3124   Capability::Client bootstrap(AnyStruct::Reader vatId) {
   3125     // For now we delegate to restore() since it's equivalent, but eventually we'll remove restore()
   3126     // and implement bootstrap() directly.
   3127     return restore(vatId, AnyPointer::Reader());
   3128   }
   3129 
   3130   Capability::Client restore(AnyStruct::Reader vatId, AnyPointer::Reader objectId) {
   3131     KJ_IF_MAYBE(connection, network.baseConnect(vatId)) {
   3132       auto& state = getConnectionState(kj::mv(*connection));
   3133       return Capability::Client(state.restore(objectId));
   3134     } else if (objectId.isNull()) {
   3135       // Turns out `vatId` refers to ourselves, so we can also pass it as the client ID for
   3136       // baseCreateFor().
   3137       return bootstrapFactory.baseCreateFor(vatId);
   3138     } else KJ_IF_MAYBE(r, restorer) {
   3139       return r->baseRestore(objectId);
   3140     } else {
   3141       return Capability::Client(newBrokenCap(
   3142           "This vat only supports a bootstrap interface, not the old Cap'n-Proto-0.4-style "
   3143           "named exports."));
   3144     }
   3145   }
   3146 
   3147   void setFlowLimit(size_t words) {
   3148     flowLimit = words;
   3149 
   3150     for (auto& conn: connections) {
   3151       conn.second->setFlowLimit(words);
   3152     }
   3153   }
   3154 
   3155   void setTraceEncoder(kj::Function<kj::String(const kj::Exception&)> func) {
   3156     traceEncoder = kj::mv(func);
   3157   }
   3158 
   3159   kj::Promise<void> run() { return kj::mv(acceptLoopPromise); }
   3160 
   3161 private:
   3162   VatNetworkBase& network;
   3163   kj::Maybe<Capability::Client> bootstrapInterface;
   3164   BootstrapFactoryBase& bootstrapFactory;
   3165   kj::Maybe<SturdyRefRestorerBase&> restorer;
   3166   size_t flowLimit = kj::maxValue;
   3167   kj::Maybe<kj::Function<kj::String(const kj::Exception&)>> traceEncoder;
   3168   kj::Promise<void> acceptLoopPromise = nullptr;
   3169   kj::TaskSet tasks;
   3170 
   3171   typedef std::unordered_map<VatNetworkBase::Connection*, kj::Own<RpcConnectionState>>
   3172       ConnectionMap;
   3173   ConnectionMap connections;
   3174 
   3175   kj::UnwindDetector unwindDetector;
   3176 
   3177   RpcConnectionState& getConnectionState(kj::Own<VatNetworkBase::Connection>&& connection) {
   3178     auto iter = connections.find(connection);
   3179     if (iter == connections.end()) {
   3180       VatNetworkBase::Connection* connectionPtr = connection;
   3181       auto onDisconnect = kj::newPromiseAndFulfiller<RpcConnectionState::DisconnectInfo>();
   3182       tasks.add(onDisconnect.promise
   3183           .then([this,connectionPtr](RpcConnectionState::DisconnectInfo info) {
   3184         connections.erase(connectionPtr);
   3185         tasks.add(kj::mv(info.shutdownPromise));
   3186       }));
   3187       auto newState = kj::refcounted<RpcConnectionState>(
   3188           bootstrapFactory, restorer, kj::mv(connection),
   3189           kj::mv(onDisconnect.fulfiller), flowLimit, traceEncoder);
   3190       RpcConnectionState& result = *newState;
   3191       connections.insert(std::make_pair(connectionPtr, kj::mv(newState)));
   3192       return result;
   3193     } else {
   3194       return *iter->second;
   3195     }
   3196   }
   3197 
   3198   kj::Promise<void> acceptLoop() {
   3199     return network.baseAccept().then(
   3200         [this](kj::Own<VatNetworkBase::Connection>&& connection) {
   3201       getConnectionState(kj::mv(connection));
   3202       return acceptLoop();
   3203     });
   3204   }
   3205 
   3206   Capability::Client baseCreateFor(AnyStruct::Reader clientId) override {
   3207     // Implements BootstrapFactory::baseCreateFor() in terms of `bootstrapInterface` or `restorer`,
   3208     // for use when we were given one of those instead of an actual `bootstrapFactory`.
   3209 
   3210     KJ_IF_MAYBE(cap, bootstrapInterface) {
   3211       return *cap;
   3212     } else KJ_IF_MAYBE(r, restorer) {
   3213       return r->baseRestore(AnyPointer::Reader());
   3214     } else {
   3215       return KJ_EXCEPTION(FAILED, "This vat does not expose any public/bootstrap interfaces.");
   3216     }
   3217   }
   3218 
   3219   void taskFailed(kj::Exception&& exception) override {
   3220     KJ_LOG(ERROR, exception);
   3221   }
   3222 };
   3223 
   3224 RpcSystemBase::RpcSystemBase(VatNetworkBase& network,
   3225                              kj::Maybe<Capability::Client> bootstrapInterface)
   3226     : impl(kj::heap<Impl>(network, kj::mv(bootstrapInterface))) {}
   3227 RpcSystemBase::RpcSystemBase(VatNetworkBase& network,
   3228                              BootstrapFactoryBase& bootstrapFactory)
   3229     : impl(kj::heap<Impl>(network, bootstrapFactory)) {}
   3230 RpcSystemBase::RpcSystemBase(VatNetworkBase& network, SturdyRefRestorerBase& restorer)
   3231     : impl(kj::heap<Impl>(network, restorer)) {}
   3232 RpcSystemBase::RpcSystemBase(RpcSystemBase&& other) noexcept = default;
   3233 RpcSystemBase::~RpcSystemBase() noexcept(false) {}
   3234 
   3235 Capability::Client RpcSystemBase::baseBootstrap(AnyStruct::Reader vatId) {
   3236   return impl->bootstrap(vatId);
   3237 }
   3238 
   3239 Capability::Client RpcSystemBase::baseRestore(
   3240     AnyStruct::Reader hostId, AnyPointer::Reader objectId) {
   3241   return impl->restore(hostId, objectId);
   3242 }
   3243 
   3244 void RpcSystemBase::baseSetFlowLimit(size_t words) {
   3245   return impl->setFlowLimit(words);
   3246 }
   3247 
   3248 void RpcSystemBase::setTraceEncoder(kj::Function<kj::String(const kj::Exception&)> func) {
   3249   impl->setTraceEncoder(kj::mv(func));
   3250 }
   3251 
   3252 kj::Promise<void> RpcSystemBase::run() {
   3253   return impl->run();
   3254 }
   3255 
   3256 }  // namespace _ (private)
   3257 
   3258 // =======================================================================================
   3259 
   3260 namespace {
   3261 
   3262 class WindowFlowController final: public RpcFlowController, private kj::TaskSet::ErrorHandler {
   3263 public:
   3264   WindowFlowController(RpcFlowController::WindowGetter& windowGetter)
   3265       : windowGetter(windowGetter), tasks(*this) {
   3266     state.init<Running>();
   3267   }
   3268 
   3269   kj::Promise<void> send(kj::Own<OutgoingRpcMessage> message, kj::Promise<void> ack) override {
   3270     auto size = message->sizeInWords() * sizeof(capnp::word);
   3271     maxMessageSize = kj::max(size, maxMessageSize);
   3272 
   3273     // We are REQUIRED to send the message NOW to maintain correct ordering.
   3274     message->send();
   3275 
   3276     inFlight += size;
   3277     tasks.add(ack.then([this, size]() {
   3278       inFlight -= size;
   3279       KJ_SWITCH_ONEOF(state) {
   3280         KJ_CASE_ONEOF(blockedSends, Running) {
   3281           if (isReady()) {
   3282             // Release all fulfillers.
   3283             for (auto& fulfiller: blockedSends) {
   3284               fulfiller->fulfill();
   3285             }
   3286             blockedSends.clear();
   3287 
   3288           }
   3289 
   3290           KJ_IF_MAYBE(f, emptyFulfiller) {
   3291             if (inFlight == 0) {
   3292               f->get()->fulfill(tasks.onEmpty());
   3293             }
   3294           }
   3295         }
   3296         KJ_CASE_ONEOF(exception, kj::Exception) {
   3297           // A previous call failed, but this one -- which was already in-flight at the time --
   3298           // ended up succeeding. That may indicate that the server side is not properly
   3299           // handling streaming error propagation. Nothing much we can do about it here though.
   3300         }
   3301       }
   3302     }));
   3303 
   3304     KJ_SWITCH_ONEOF(state) {
   3305       KJ_CASE_ONEOF(blockedSends, Running) {
   3306         if (isReady()) {
   3307           return kj::READY_NOW;
   3308         } else {
   3309           auto paf = kj::newPromiseAndFulfiller<void>();
   3310           blockedSends.add(kj::mv(paf.fulfiller));
   3311           return kj::mv(paf.promise);
   3312         }
   3313       }
   3314       KJ_CASE_ONEOF(exception, kj::Exception) {
   3315         return kj::cp(exception);
   3316       }
   3317     }
   3318     KJ_UNREACHABLE;
   3319   }
   3320 
   3321   kj::Promise<void> waitAllAcked() override {
   3322     KJ_IF_MAYBE(q, state.tryGet<Running>()) {
   3323       if (!q->empty()) {
   3324         auto paf = kj::newPromiseAndFulfiller<kj::Promise<void>>();
   3325         emptyFulfiller = kj::mv(paf.fulfiller);
   3326         return kj::mv(paf.promise);
   3327       }
   3328     }
   3329     return tasks.onEmpty();
   3330   }
   3331 
   3332 private:
   3333   RpcFlowController::WindowGetter& windowGetter;
   3334   size_t inFlight = 0;
   3335   size_t maxMessageSize = 0;
   3336 
   3337   typedef kj::Vector<kj::Own<kj::PromiseFulfiller<void>>> Running;
   3338   kj::OneOf<Running, kj::Exception> state;
   3339 
   3340   kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Promise<void>>>> emptyFulfiller;
   3341 
   3342   kj::TaskSet tasks;
   3343 
   3344   void taskFailed(kj::Exception&& exception) override {
   3345     KJ_SWITCH_ONEOF(state) {
   3346       KJ_CASE_ONEOF(blockedSends, Running) {
   3347         // Fail out all pending sends.
   3348         for (auto& fulfiller: blockedSends) {
   3349           fulfiller->reject(kj::cp(exception));
   3350         }
   3351         // Fail out all future sends.
   3352         state = kj::mv(exception);
   3353       }
   3354       KJ_CASE_ONEOF(exception, kj::Exception) {
   3355         // ignore redundant exception
   3356       }
   3357     }
   3358   }
   3359 
   3360   bool isReady() {
   3361     // We extend the window by maxMessageSize to avoid a pathological situation when a message
   3362     // is larger than the window size. Otherwise, after sending that message, we would end up
   3363     // not sending any others until the ack was received, wasting a round trip's worth of
   3364     // bandwidth.
   3365     return inFlight <= maxMessageSize  // avoid getWindow() call if unnecessary
   3366         || inFlight < windowGetter.getWindow() + maxMessageSize;
   3367   }
   3368 };
   3369 
   3370 class FixedWindowFlowController final
   3371     : public RpcFlowController, public RpcFlowController::WindowGetter {
   3372 public:
   3373   FixedWindowFlowController(size_t windowSize): windowSize(windowSize), inner(*this) {}
   3374 
   3375   kj::Promise<void> send(kj::Own<OutgoingRpcMessage> message, kj::Promise<void> ack) override {
   3376     return inner.send(kj::mv(message), kj::mv(ack));
   3377   }
   3378 
   3379   kj::Promise<void> waitAllAcked() override {
   3380     return inner.waitAllAcked();
   3381   }
   3382 
   3383   size_t getWindow() override { return windowSize; }
   3384 
   3385 private:
   3386   size_t windowSize;
   3387   WindowFlowController inner;
   3388 };
   3389 
   3390 }  // namespace
   3391 
   3392 kj::Own<RpcFlowController> RpcFlowController::newFixedWindowController(size_t windowSize) {
   3393   return kj::heap<FixedWindowFlowController>(windowSize);
   3394 }
   3395 kj::Own<RpcFlowController> RpcFlowController::newVariableWindowController(WindowGetter& getter) {
   3396   return kj::heap<WindowFlowController>(getter);
   3397 }
   3398 
   3399 }  // namespace capnp