rpc-twoparty.h (11711B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #pragma once 23 24 #include "rpc.h" 25 #include "message.h" 26 #include <kj/async-io.h> 27 #include <capnp/serialize-async.h> 28 #include <capnp/rpc-twoparty.capnp.h> 29 #include <kj/one-of.h> 30 31 CAPNP_BEGIN_HEADER 32 33 namespace capnp { 34 35 namespace rpc { 36 namespace twoparty { 37 typedef VatId SturdyRefHostId; // For backwards-compatibility with version 0.4. 38 } 39 } 40 41 typedef VatNetwork<rpc::twoparty::VatId, rpc::twoparty::ProvisionId, 42 rpc::twoparty::RecipientId, rpc::twoparty::ThirdPartyCapId, rpc::twoparty::JoinResult> 43 TwoPartyVatNetworkBase; 44 45 class TwoPartyVatNetwork: public TwoPartyVatNetworkBase, 46 private TwoPartyVatNetworkBase::Connection, 47 private RpcFlowController::WindowGetter { 48 // A `VatNetwork` that consists of exactly two parties communicating over an arbitrary byte 49 // stream. This is used to implement the common case of a client/server network. 50 // 51 // See `ez-rpc.h` for a simple interface for setting up two-party clients and servers. 52 // Use `TwoPartyVatNetwork` only if you need the advanced features. 53 54 public: 55 TwoPartyVatNetwork(MessageStream& msgStream, 56 rpc::twoparty::Side side, ReaderOptions receiveOptions = ReaderOptions(), 57 const kj::MonotonicClock& clock = kj::systemCoarseMonotonicClock()); 58 TwoPartyVatNetwork(MessageStream& msgStream, uint maxFdsPerMessage, 59 rpc::twoparty::Side side, ReaderOptions receiveOptions = ReaderOptions(), 60 const kj::MonotonicClock& clock = kj::systemCoarseMonotonicClock()); 61 TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty::Side side, 62 ReaderOptions receiveOptions = ReaderOptions(), 63 const kj::MonotonicClock& clock = kj::systemCoarseMonotonicClock()); 64 TwoPartyVatNetwork(kj::AsyncCapabilityStream& stream, uint maxFdsPerMessage, 65 rpc::twoparty::Side side, ReaderOptions receiveOptions = ReaderOptions(), 66 const kj::MonotonicClock& clock = kj::systemCoarseMonotonicClock()); 67 // To support FD passing, pass an AsyncCapabilityStream or a MessageStream which supports 68 // fd passing, and `maxFdsPerMessage`, which specifies the maximum number of file descriptors 69 // to accept from the peer in any one RPC message. It is important to keep maxFdsPerMessage 70 // low in order to stop DoS attacks that fill up your FD table. 71 // 72 // Note that this limit applies only to incoming messages; outgoing messages are allowed to have 73 // more FDs. Sometimes it makes sense to enforce a limit of zero in one direction while having 74 // a non-zero limit in the other. For example, in a supervisor/sandbox scenario, typically there 75 // are many use cases for passing FDs from supervisor to sandbox but no use case for vice versa. 76 // The supervisor may be configured not to accept any FDs from the sandbox in order to reduce 77 // risk of DoS attacks. 78 // 79 // clock is used for calculating the oldest queued message age, which is a useful metric for 80 // detecting queue overload 81 82 KJ_DISALLOW_COPY(TwoPartyVatNetwork); 83 84 kj::Promise<void> onDisconnect() { return disconnectPromise.addBranch(); } 85 // Returns a promise that resolves when the peer disconnects. 86 87 rpc::twoparty::Side getSide() { return side; } 88 89 size_t getCurrentQueueSize() { return currentQueueSize; } 90 // Get the number of bytes worth of outgoing messages that are currently queued in memory waiting 91 // to be sent on this connection. This may be useful for backpressure. 92 93 size_t getCurrentQueueCount() { return currentQueueCount; } 94 // Get the count of outgoing messages that are currently queued in memory waiting 95 // to be sent on this connection. This may be useful for backpressure. 96 97 kj::Duration getOutgoingMessageWaitTime(); 98 // Get how long the current outgoing message has been waiting to be sent on this connection. 99 // Returns 0 if the queue is empty. This may be useful for backpressure. 100 101 // implements VatNetwork ----------------------------------------------------- 102 103 kj::Maybe<kj::Own<TwoPartyVatNetworkBase::Connection>> connect( 104 rpc::twoparty::VatId::Reader ref) override; 105 kj::Promise<kj::Own<TwoPartyVatNetworkBase::Connection>> accept() override; 106 107 private: 108 class OutgoingMessageImpl; 109 class IncomingMessageImpl; 110 111 kj::OneOf<MessageStream*, kj::Own<MessageStream>> stream; 112 // The underlying stream, which we may or may not own. Get a reference to 113 // this with getStream, rather than reading it directly. 114 115 uint maxFdsPerMessage; 116 rpc::twoparty::Side side; 117 MallocMessageBuilder peerVatId; 118 ReaderOptions receiveOptions; 119 bool accepted = false; 120 121 bool solSndbufUnimplemented = false; 122 // Whether stream.getsockopt(SO_SNDBUF) has been observed to throw UNIMPLEMENTED. 123 124 kj::Canceler readCanceler; 125 kj::Maybe<kj::Exception> readCancelReason; 126 // Used to propagate write errors into (permanent) read errors. 127 128 kj::Maybe<kj::Promise<void>> previousWrite; 129 // Resolves when the previous write completes. This effectively serves as the write queue. 130 // Becomes null when shutdown() is called. 131 132 kj::Own<kj::PromiseFulfiller<kj::Own<TwoPartyVatNetworkBase::Connection>>> acceptFulfiller; 133 // Fulfiller for the promise returned by acceptConnectionAsRefHost() on the client side, or the 134 // second call on the server side. Never fulfilled, because there is only one connection. 135 136 kj::ForkedPromise<void> disconnectPromise = nullptr; 137 138 size_t currentQueueSize = 0; 139 size_t currentQueueCount = 0; 140 const kj::MonotonicClock& clock; 141 kj::TimePoint currentOutgoingMessageSendTime; 142 143 class FulfillerDisposer: public kj::Disposer { 144 // Hack: TwoPartyVatNetwork is both a VatNetwork and a VatNetwork::Connection. When the RPC 145 // system detects (or initiates) a disconnection, it drops its reference to the Connection. 146 // When all references have been dropped, then we want disconnectPromise to be fulfilled. 147 // So we hand out Own<Connection>s with this disposer attached, so that we can detect when 148 // they are dropped. 149 150 public: 151 mutable kj::Own<kj::PromiseFulfiller<void>> fulfiller; 152 mutable uint refcount = 0; 153 154 void disposeImpl(void* pointer) const override; 155 }; 156 FulfillerDisposer disconnectFulfiller; 157 158 159 TwoPartyVatNetwork( 160 kj::OneOf<MessageStream*, kj::Own<MessageStream>>&& stream, 161 uint maxFdsPerMessage, 162 rpc::twoparty::Side side, 163 ReaderOptions receiveOptions, 164 const kj::MonotonicClock& clock); 165 166 MessageStream& getStream(); 167 168 kj::Own<TwoPartyVatNetworkBase::Connection> asConnection(); 169 // Returns a pointer to this with the disposer set to disconnectFulfiller. 170 171 // implements Connection ----------------------------------------------------- 172 173 kj::Own<RpcFlowController> newStream() override; 174 rpc::twoparty::VatId::Reader getPeerVatId() override; 175 kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) override; 176 kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() override; 177 kj::Promise<void> shutdown() override; 178 179 // implements WindowGetter --------------------------------------------------- 180 181 size_t getWindow() override; 182 }; 183 184 class TwoPartyServer: private kj::TaskSet::ErrorHandler { 185 // Convenience class which implements a simple server which accepts connections on a listener 186 // socket and serices them as two-party connections. 187 188 public: 189 explicit TwoPartyServer(Capability::Client bootstrapInterface); 190 191 void accept(kj::Own<kj::AsyncIoStream>&& connection); 192 void accept(kj::Own<kj::AsyncCapabilityStream>&& connection, uint maxFdsPerMessage); 193 // Accepts the connection for servicing. 194 195 kj::Promise<void> accept(kj::AsyncIoStream& connection) KJ_WARN_UNUSED_RESULT; 196 kj::Promise<void> accept(kj::AsyncCapabilityStream& connection, uint maxFdsPerMessage) 197 KJ_WARN_UNUSED_RESULT; 198 // Accept connection without taking ownership. The returned promise resolves when the client 199 // disconnects. Dropping the promise forcefully cancels the RPC protocol. 200 // 201 // You probably can't do anything with `connection` after the RPC protocol has terminated, other 202 // than to close it. The main reason to use these methods rather than the ownership-taking ones 203 // is if your stream object becomes invalid outside some scope, so you want to make sure to 204 // cancel all usage of it before that by cancelling the promise. 205 206 kj::Promise<void> listen(kj::ConnectionReceiver& listener); 207 // Listens for connections on the given listener. The returned promise never resolves unless an 208 // exception is thrown while trying to accept. You may discard the returned promise to cancel 209 // listening. 210 211 kj::Promise<void> listenCapStreamReceiver( 212 kj::ConnectionReceiver& listener, uint maxFdsPerMessage); 213 // Listen with support for FD transfers. `listener.accept()` must return instances of 214 // AsyncCapabilityStream, otherwise this will crash. 215 216 kj::Promise<void> drain() { return tasks.onEmpty(); } 217 // Resolves when all clients have disconnected. 218 // 219 // Only considers clients whose connections TwoPartyServer took ownership of. 220 221 private: 222 Capability::Client bootstrapInterface; 223 kj::TaskSet tasks; 224 225 struct AcceptedConnection; 226 227 void taskFailed(kj::Exception&& exception) override; 228 }; 229 230 class TwoPartyClient { 231 // Convenience class which implements a simple client. 232 233 public: 234 explicit TwoPartyClient(kj::AsyncIoStream& connection); 235 explicit TwoPartyClient(kj::AsyncCapabilityStream& connection, uint maxFdsPerMessage); 236 TwoPartyClient(kj::AsyncIoStream& connection, Capability::Client bootstrapInterface, 237 rpc::twoparty::Side side = rpc::twoparty::Side::CLIENT); 238 TwoPartyClient(kj::AsyncCapabilityStream& connection, uint maxFdsPerMessage, 239 Capability::Client bootstrapInterface, 240 rpc::twoparty::Side side = rpc::twoparty::Side::CLIENT); 241 242 Capability::Client bootstrap(); 243 // Get the server's bootstrap interface. 244 245 inline kj::Promise<void> onDisconnect() { return network.onDisconnect(); } 246 247 void setTraceEncoder(kj::Function<kj::String(const kj::Exception&)> func); 248 // Forwarded to rpcSystem.setTraceEncoder(). 249 250 size_t getCurrentQueueSize() { return network.getCurrentQueueSize(); } 251 size_t getCurrentQueueCount() { return network.getCurrentQueueCount(); } 252 kj::Duration getOutgoingMessageWaitTime() { return network.getOutgoingMessageWaitTime(); } 253 254 private: 255 TwoPartyVatNetwork network; 256 RpcSystem<rpc::twoparty::VatId> rpcSystem; 257 }; 258 259 } // namespace capnp 260 261 CAPNP_END_HEADER