capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

rpc-twoparty.h (11711B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #pragma once
     23 
     24 #include "rpc.h"
     25 #include "message.h"
     26 #include <kj/async-io.h>
     27 #include <capnp/serialize-async.h>
     28 #include <capnp/rpc-twoparty.capnp.h>
     29 #include <kj/one-of.h>
     30 
     31 CAPNP_BEGIN_HEADER
     32 
     33 namespace capnp {
     34 
     35 namespace rpc {
     36   namespace twoparty {
     37     typedef VatId SturdyRefHostId;  // For backwards-compatibility with version 0.4.
     38   }
     39 }
     40 
     41 typedef VatNetwork<rpc::twoparty::VatId, rpc::twoparty::ProvisionId,
     42     rpc::twoparty::RecipientId, rpc::twoparty::ThirdPartyCapId, rpc::twoparty::JoinResult>
     43     TwoPartyVatNetworkBase;
     44 
     45 class TwoPartyVatNetwork: public TwoPartyVatNetworkBase,
     46                           private TwoPartyVatNetworkBase::Connection,
     47                           private RpcFlowController::WindowGetter {
     48   // A `VatNetwork` that consists of exactly two parties communicating over an arbitrary byte
     49   // stream.  This is used to implement the common case of a client/server network.
     50   //
     51   // See `ez-rpc.h` for a simple interface for setting up two-party clients and servers.
     52   // Use `TwoPartyVatNetwork` only if you need the advanced features.
     53 
     54 public:
     55   TwoPartyVatNetwork(MessageStream& msgStream,
     56                      rpc::twoparty::Side side, ReaderOptions receiveOptions = ReaderOptions(),
     57                      const kj::MonotonicClock& clock = kj::systemCoarseMonotonicClock());
     58   TwoPartyVatNetwork(MessageStream& msgStream, uint maxFdsPerMessage,
     59                      rpc::twoparty::Side side, ReaderOptions receiveOptions = ReaderOptions(),
     60                      const kj::MonotonicClock& clock = kj::systemCoarseMonotonicClock());
     61   TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty::Side side,
     62                      ReaderOptions receiveOptions = ReaderOptions(),
     63                      const kj::MonotonicClock& clock = kj::systemCoarseMonotonicClock());
     64   TwoPartyVatNetwork(kj::AsyncCapabilityStream& stream, uint maxFdsPerMessage,
     65                      rpc::twoparty::Side side, ReaderOptions receiveOptions = ReaderOptions(),
     66                      const kj::MonotonicClock& clock = kj::systemCoarseMonotonicClock());
     67   // To support FD passing, pass an AsyncCapabilityStream or a MessageStream which supports
     68   // fd passing, and `maxFdsPerMessage`, which specifies the maximum number of file descriptors
     69   // to accept from the peer in any one RPC message. It is important to keep maxFdsPerMessage
     70   // low in order to stop DoS attacks that fill up your FD table.
     71   //
     72   // Note that this limit applies only to incoming messages; outgoing messages are allowed to have
     73   // more FDs. Sometimes it makes sense to enforce a limit of zero in one direction while having
     74   // a non-zero limit in the other. For example, in a supervisor/sandbox scenario, typically there
     75   // are many use cases for passing FDs from supervisor to sandbox but no use case for vice versa.
     76   // The supervisor may be configured not to accept any FDs from the sandbox in order to reduce
     77   // risk of DoS attacks.
     78   //
     79   // clock is used for calculating the oldest queued message age, which is a useful metric for
     80   // detecting queue overload
     81 
     82   KJ_DISALLOW_COPY(TwoPartyVatNetwork);
     83 
     84   kj::Promise<void> onDisconnect() { return disconnectPromise.addBranch(); }
     85   // Returns a promise that resolves when the peer disconnects.
     86 
     87   rpc::twoparty::Side getSide() { return side; }
     88 
     89   size_t getCurrentQueueSize() { return currentQueueSize; }
     90   // Get the number of bytes worth of outgoing messages that are currently queued in memory waiting
     91   // to be sent on this connection. This may be useful for backpressure.
     92 
     93   size_t getCurrentQueueCount() { return currentQueueCount; }
     94   // Get the count of outgoing messages that are currently queued in memory waiting
     95   // to be sent on this connection. This may be useful for backpressure.
     96 
     97   kj::Duration getOutgoingMessageWaitTime();
     98   // Get how long the current outgoing message has been waiting to be sent on this connection.
     99   // Returns 0 if the queue is empty. This may be useful for backpressure.
    100 
    101   // implements VatNetwork -----------------------------------------------------
    102 
    103   kj::Maybe<kj::Own<TwoPartyVatNetworkBase::Connection>> connect(
    104       rpc::twoparty::VatId::Reader ref) override;
    105   kj::Promise<kj::Own<TwoPartyVatNetworkBase::Connection>> accept() override;
    106 
    107 private:
    108   class OutgoingMessageImpl;
    109   class IncomingMessageImpl;
    110 
    111   kj::OneOf<MessageStream*, kj::Own<MessageStream>> stream;
    112   // The underlying stream, which we may or may not own. Get a reference to
    113   // this with getStream, rather than reading it directly.
    114 
    115   uint maxFdsPerMessage;
    116   rpc::twoparty::Side side;
    117   MallocMessageBuilder peerVatId;
    118   ReaderOptions receiveOptions;
    119   bool accepted = false;
    120 
    121   bool solSndbufUnimplemented = false;
    122   // Whether stream.getsockopt(SO_SNDBUF) has been observed to throw UNIMPLEMENTED.
    123 
    124   kj::Canceler readCanceler;
    125   kj::Maybe<kj::Exception> readCancelReason;
    126   // Used to propagate write errors into (permanent) read errors.
    127 
    128   kj::Maybe<kj::Promise<void>> previousWrite;
    129   // Resolves when the previous write completes.  This effectively serves as the write queue.
    130   // Becomes null when shutdown() is called.
    131 
    132   kj::Own<kj::PromiseFulfiller<kj::Own<TwoPartyVatNetworkBase::Connection>>> acceptFulfiller;
    133   // Fulfiller for the promise returned by acceptConnectionAsRefHost() on the client side, or the
    134   // second call on the server side.  Never fulfilled, because there is only one connection.
    135 
    136   kj::ForkedPromise<void> disconnectPromise = nullptr;
    137 
    138   size_t currentQueueSize = 0;
    139   size_t currentQueueCount = 0;
    140   const kj::MonotonicClock& clock;
    141   kj::TimePoint currentOutgoingMessageSendTime;
    142 
    143   class FulfillerDisposer: public kj::Disposer {
    144     // Hack:  TwoPartyVatNetwork is both a VatNetwork and a VatNetwork::Connection.  When the RPC
    145     //   system detects (or initiates) a disconnection, it drops its reference to the Connection.
    146     //   When all references have been dropped, then we want disconnectPromise to be fulfilled.
    147     //   So we hand out Own<Connection>s with this disposer attached, so that we can detect when
    148     //   they are dropped.
    149 
    150   public:
    151     mutable kj::Own<kj::PromiseFulfiller<void>> fulfiller;
    152     mutable uint refcount = 0;
    153 
    154     void disposeImpl(void* pointer) const override;
    155   };
    156   FulfillerDisposer disconnectFulfiller;
    157 
    158 
    159   TwoPartyVatNetwork(
    160       kj::OneOf<MessageStream*, kj::Own<MessageStream>>&& stream,
    161       uint maxFdsPerMessage,
    162       rpc::twoparty::Side side,
    163       ReaderOptions receiveOptions,
    164       const kj::MonotonicClock& clock);
    165 
    166   MessageStream& getStream();
    167 
    168   kj::Own<TwoPartyVatNetworkBase::Connection> asConnection();
    169   // Returns a pointer to this with the disposer set to disconnectFulfiller.
    170 
    171   // implements Connection -----------------------------------------------------
    172 
    173   kj::Own<RpcFlowController> newStream() override;
    174   rpc::twoparty::VatId::Reader getPeerVatId() override;
    175   kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) override;
    176   kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() override;
    177   kj::Promise<void> shutdown() override;
    178 
    179   // implements WindowGetter ---------------------------------------------------
    180 
    181   size_t getWindow() override;
    182 };
    183 
    184 class TwoPartyServer: private kj::TaskSet::ErrorHandler {
    185   // Convenience class which implements a simple server which accepts connections on a listener
    186   // socket and serices them as two-party connections.
    187 
    188 public:
    189   explicit TwoPartyServer(Capability::Client bootstrapInterface);
    190 
    191   void accept(kj::Own<kj::AsyncIoStream>&& connection);
    192   void accept(kj::Own<kj::AsyncCapabilityStream>&& connection, uint maxFdsPerMessage);
    193   // Accepts the connection for servicing.
    194 
    195   kj::Promise<void> accept(kj::AsyncIoStream& connection) KJ_WARN_UNUSED_RESULT;
    196   kj::Promise<void> accept(kj::AsyncCapabilityStream& connection, uint maxFdsPerMessage)
    197       KJ_WARN_UNUSED_RESULT;
    198   // Accept connection without taking ownership. The returned promise resolves when the client
    199   // disconnects. Dropping the promise forcefully cancels the RPC protocol.
    200   //
    201   // You probably can't do anything with `connection` after the RPC protocol has terminated, other
    202   // than to close it. The main reason to use these methods rather than the ownership-taking ones
    203   // is if your stream object becomes invalid outside some scope, so you want to make sure to
    204   // cancel all usage of it before that by cancelling the promise.
    205 
    206   kj::Promise<void> listen(kj::ConnectionReceiver& listener);
    207   // Listens for connections on the given listener. The returned promise never resolves unless an
    208   // exception is thrown while trying to accept. You may discard the returned promise to cancel
    209   // listening.
    210 
    211   kj::Promise<void> listenCapStreamReceiver(
    212       kj::ConnectionReceiver& listener, uint maxFdsPerMessage);
    213   // Listen with support for FD transfers. `listener.accept()` must return instances of
    214   // AsyncCapabilityStream, otherwise this will crash.
    215 
    216   kj::Promise<void> drain() { return tasks.onEmpty(); }
    217   // Resolves when all clients have disconnected.
    218   //
    219   // Only considers clients whose connections TwoPartyServer took ownership of.
    220 
    221 private:
    222   Capability::Client bootstrapInterface;
    223   kj::TaskSet tasks;
    224 
    225   struct AcceptedConnection;
    226 
    227   void taskFailed(kj::Exception&& exception) override;
    228 };
    229 
    230 class TwoPartyClient {
    231   // Convenience class which implements a simple client.
    232 
    233 public:
    234   explicit TwoPartyClient(kj::AsyncIoStream& connection);
    235   explicit TwoPartyClient(kj::AsyncCapabilityStream& connection, uint maxFdsPerMessage);
    236   TwoPartyClient(kj::AsyncIoStream& connection, Capability::Client bootstrapInterface,
    237                  rpc::twoparty::Side side = rpc::twoparty::Side::CLIENT);
    238   TwoPartyClient(kj::AsyncCapabilityStream& connection, uint maxFdsPerMessage,
    239                  Capability::Client bootstrapInterface,
    240                  rpc::twoparty::Side side = rpc::twoparty::Side::CLIENT);
    241 
    242   Capability::Client bootstrap();
    243   // Get the server's bootstrap interface.
    244 
    245   inline kj::Promise<void> onDisconnect() { return network.onDisconnect(); }
    246 
    247   void setTraceEncoder(kj::Function<kj::String(const kj::Exception&)> func);
    248   // Forwarded to rpcSystem.setTraceEncoder().
    249 
    250   size_t getCurrentQueueSize() { return network.getCurrentQueueSize(); }
    251   size_t getCurrentQueueCount() { return network.getCurrentQueueCount(); }
    252   kj::Duration getOutgoingMessageWaitTime() { return network.getOutgoingMessageWaitTime(); }
    253 
    254 private:
    255   TwoPartyVatNetwork network;
    256   RpcSystem<rpc::twoparty::VatId> rpcSystem;
    257 };
    258 
    259 }  // namespace capnp
    260 
    261 CAPNP_END_HEADER