async-io.h (47860B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #pragma once 23 24 #include "async.h" 25 #include "function.h" 26 #include "thread.h" 27 #include "timer.h" 28 29 KJ_BEGIN_HEADER 30 31 struct sockaddr; 32 33 namespace kj { 34 35 #if _WIN32 36 class Win32EventPort; 37 class AutoCloseHandle; 38 #else 39 class UnixEventPort; 40 #endif 41 42 class AutoCloseFd; 43 class NetworkAddress; 44 class AsyncOutputStream; 45 class AsyncIoStream; 46 class AncillaryMessage; 47 48 // ======================================================================================= 49 // Streaming I/O 50 51 class AsyncInputStream { 52 // Asynchronous equivalent of InputStream (from io.h). 53 54 public: 55 virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes); 56 virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0; 57 58 Promise<void> read(void* buffer, size_t bytes); 59 60 virtual Maybe<uint64_t> tryGetLength(); 61 // Get the remaining number of bytes that will be produced by this stream, if known. 62 // 63 // This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the 64 // HTTP implementation may need to fall back to Transfer-Encoding: chunked. 65 // 66 // The default implementation always returns null. 67 68 virtual Promise<uint64_t> pumpTo( 69 AsyncOutputStream& output, uint64_t amount = kj::maxValue); 70 // Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the 71 // total bytes actually pumped (which is only less than `amount` if EOF was reached). 72 // 73 // Override this if your stream type knows how to pump itself to certain kinds of output 74 // streams more efficiently than via the naive approach. You can use 75 // kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match, 76 // delegate to the default implementation. 77 // 78 // The default implementation first tries calling output.tryPumpFrom(), but if that fails, it 79 // performs a naive pump by allocating a buffer and reading to it / writing from it in a loop. 80 81 Promise<Array<byte>> readAllBytes(uint64_t limit = kj::maxValue); 82 Promise<String> readAllText(uint64_t limit = kj::maxValue); 83 // Read until EOF and return as one big byte array or string. Throw an exception if EOF is not 84 // seen before reading `limit` bytes. 85 // 86 // To prevent runaway memory allocation, consider using a more conservative value for `limit` than 87 // the default, particularly on untrusted data streams which may never see EOF. 88 89 virtual void registerAncillaryMessageHandler(Function<void(ArrayPtr<AncillaryMessage>)> fn); 90 // Register interest in checking for ancillary messages (aka control messages) when reading. 91 // The provided callback will be called whenever any are encountered. The messages passed to 92 // the function do not live beyond when function returns. 93 // Only supported on Unix (the default impl throws UNIMPLEMENTED). Most apps will not use this. 94 95 virtual Maybe<Own<AsyncInputStream>> tryTee(uint64_t limit = kj::maxValue); 96 // Primarily intended as an optimization for the `tee` call. Returns an input stream whose state 97 // is independent from this one but which will return the exact same set of bytes read going 98 // forward. limit is a total limit on the amount of memory, in bytes, which a tee implementation 99 // may use to buffer stream data. An implementation must throw an exception if a read operation 100 // would cause the limit to be exceeded. If tryTee() can see that the new limit is impossible to 101 // satisfy, it should return nullptr so that the pessimized path is taken in newTee. This is 102 // likely to arise if tryTee() is called twice with different limits on the same stream. 103 }; 104 105 class AsyncOutputStream { 106 // Asynchronous equivalent of OutputStream (from io.h). 107 108 public: 109 virtual Promise<void> write(const void* buffer, size_t size) KJ_WARN_UNUSED_RESULT = 0; 110 virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) 111 KJ_WARN_UNUSED_RESULT = 0; 112 113 virtual Maybe<Promise<uint64_t>> tryPumpFrom( 114 AsyncInputStream& input, uint64_t amount = kj::maxValue); 115 // Implements double-dispatch for AsyncInputStream::pumpTo(). 116 // 117 // This method should only be called from within an implementation of pumpTo(). 118 // 119 // This method examines the type of `input` to find optimized ways to pump data from it to this 120 // output stream. If it finds one, it performs the pump. Otherwise, it returns null. 121 // 122 // The default implementation always returns null. 123 124 virtual Promise<void> whenWriteDisconnected() = 0; 125 // Returns a promise that resolves when the stream has become disconnected such that new write()s 126 // will fail with a DISCONNECTED exception. This is particularly useful, for example, to cancel 127 // work early when it is detected that no one will receive the result. 128 // 129 // Note that not all streams are able to detect this condition without actually performing a 130 // write(); such stream implementations may return a promise that never resolves. (In particular, 131 // as of this writing, whenWriteDisconnected() is not implemented on Windows. Also, for TCP 132 // streams, not all disconnects are detectable -- a power or network failure may lead the 133 // connection to hang forever, or until configured socket options lead to a timeout.) 134 // 135 // Unlike most other asynchronous stream methods, it is safe to call whenWriteDisconnected() 136 // multiple times without canceling the previous promises. 137 }; 138 139 class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream { 140 // A combination input and output stream. 141 142 public: 143 virtual void shutdownWrite() = 0; 144 // Cleanly shut down just the write end of the stream, while keeping the read end open. 145 146 virtual void abortRead() {} 147 // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only 148 // be called when an error has occurred. 149 150 virtual void getsockopt(int level, int option, void* value, uint* length); 151 virtual void setsockopt(int level, int option, const void* value, uint length); 152 // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception 153 // if the stream is not a socket or the option is not appropriate for the socket type. The 154 // default implementations always throw "unimplemented". 155 156 virtual void getsockname(struct sockaddr* addr, uint* length); 157 virtual void getpeername(struct sockaddr* addr, uint* length); 158 // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented" 159 // exception if the stream is not a socket. The default implementations always throw 160 // "unimplemented". 161 // 162 // Note that we don't provide methods that return NetworkAddress because it usually wouldn't 163 // be useful. You can't connect() to or listen() on these addresses, obviously, because they are 164 // ephemeral addresses for a single connection. 165 166 virtual kj::Maybe<int> getFd() const { return nullptr; } 167 // Get the underlying Unix file descriptor, if any. Returns nullptr if this object actually 168 // isn't wrapping a file descriptor. 169 }; 170 171 class AsyncCapabilityStream: public AsyncIoStream { 172 // An AsyncIoStream that also allows transmitting new stream objects and file descriptors 173 // (capabilities, in the object-capability model sense), in addition to bytes. 174 // 175 // Capabilities can be attached to bytes when they are written. On the receiving end, the read() 176 // that receives the first byte of such a message will also receive the capabilities. 177 // 178 // Note that AsyncIoStream's regular byte-oriented methods can be used on AsyncCapabilityStream, 179 // with the effect of silently dropping any capabilities attached to the respective bytes. E.g. 180 // using `AsyncIoStream::tryRead()` to read bytes that had been sent with `writeWithFds()` will 181 // silently drop the FDs (closing them if appropriate). Also note that pumping a stream with 182 // `pumpTo()` always drops all capabilities attached to the pumped data. (TODO(someday): Do we 183 // want a version of pumpTo() that preserves capabilities?) 184 // 185 // On Unix, KJ provides an implementation based on Unix domain sockets and file descriptor 186 // passing via SCM_RIGHTS. Due to the nature of SCM_RIGHTS, if the application accidentally 187 // read()s when it should have called receiveStream(), it will observe a NUL byte in the data 188 // and the capability will be discarded. Of course, an application should not depend on this 189 // behavior; it should avoid read()ing through a capability. 190 // 191 // KJ does not provide any inter-process implementation of this type on Windows, as there's no 192 // obvious implementation there. Handle passing on Windows requires at least one of the processes 193 // involved to have permission to modify the other's handle table, which is effectively full 194 // control. Handle passing between mutually non-trusting processes would require a trusted 195 // broker process to facilitate. One could possibly implement this type in terms of such a 196 // broker, or in terms of direct handle passing if at least one process trusts the other. 197 198 public: 199 virtual Promise<void> writeWithFds(ArrayPtr<const byte> data, 200 ArrayPtr<const ArrayPtr<const byte>> moreData, 201 ArrayPtr<const int> fds) = 0; 202 Promise<void> writeWithFds(ArrayPtr<const byte> data, 203 ArrayPtr<const ArrayPtr<const byte>> moreData, 204 ArrayPtr<const AutoCloseFd> fds); 205 // Write some data to the stream with some file descriptors attached to it. 206 // 207 // The maximum number of FDs that can be sent at a time is usually subject to an OS-imposed 208 // limit. On Linux, this is 253. In practice, sending more than a handful of FDs at once is 209 // probably a bad idea. 210 211 struct ReadResult { 212 size_t byteCount; 213 size_t capCount; 214 }; 215 216 virtual Promise<ReadResult> tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes, 217 AutoCloseFd* fdBuffer, size_t maxFds) = 0; 218 // Read data from the stream that may have file descriptors attached. Any attached descriptors 219 // will be placed in `fdBuffer`. If multiple bundles of FDs are encountered in the course of 220 // reading the amount of data requested by minBytes/maxBytes, then they will be concatenated. If 221 // more FDs are received than fit in the buffer, then the excess will be discarded and closed -- 222 // this behavior, while ugly, is important to defend against denial-of-service attacks that may 223 // fill up the FD table with garbage. Applications must think carefully about how many FDs they 224 // really need to receive at once and set a well-defined limit. 225 226 virtual Promise<void> writeWithStreams(ArrayPtr<const byte> data, 227 ArrayPtr<const ArrayPtr<const byte>> moreData, 228 Array<Own<AsyncCapabilityStream>> streams) = 0; 229 virtual Promise<ReadResult> tryReadWithStreams( 230 void* buffer, size_t minBytes, size_t maxBytes, 231 Own<AsyncCapabilityStream>* streamBuffer, size_t maxStreams) = 0; 232 // Like above, but passes AsyncCapabilityStream objects. The stream implementations must be from 233 // the same AsyncIoProvider. 234 235 // --------------------------------------------------------------------------- 236 // Helpers for sending individual capabilities. 237 // 238 // These are equivalent to the above methods with the constraint that only one FD is 239 // sent/received at a time and the corresponding data is a single zero-valued byte. 240 241 Promise<Own<AsyncCapabilityStream>> receiveStream(); 242 Promise<Maybe<Own<AsyncCapabilityStream>>> tryReceiveStream(); 243 Promise<void> sendStream(Own<AsyncCapabilityStream> stream); 244 // Transfer a single stream. 245 246 Promise<AutoCloseFd> receiveFd(); 247 Promise<Maybe<AutoCloseFd>> tryReceiveFd(); 248 Promise<void> sendFd(int fd); 249 // Transfer a single raw file descriptor. 250 }; 251 252 struct OneWayPipe { 253 // A data pipe with an input end and an output end. (Typically backed by pipe() system call.) 254 255 Own<AsyncInputStream> in; 256 Own<AsyncOutputStream> out; 257 }; 258 259 OneWayPipe newOneWayPipe(kj::Maybe<uint64_t> expectedLength = nullptr); 260 // Constructs a OneWayPipe that operates in-process. The pipe does not do any buffering -- it waits 261 // until both a read() and a write() call are pending, then resolves both. 262 // 263 // If `expectedLength` is non-null, then the pipe will be expected to transmit exactly that many 264 // bytes. The input end's `tryGetLength()` will return the number of bytes left. 265 266 struct TwoWayPipe { 267 // A data pipe that supports sending in both directions. Each end's output sends data to the 268 // other end's input. (Typically backed by socketpair() system call.) 269 270 Own<AsyncIoStream> ends[2]; 271 }; 272 273 TwoWayPipe newTwoWayPipe(); 274 // Constructs a TwoWayPipe that operates in-process. The pipe does not do any buffering -- it waits 275 // until both a read() and a write() call are pending, then resolves both. 276 277 struct CapabilityPipe { 278 // Like TwoWayPipe but allowing capability-passing. 279 280 Own<AsyncCapabilityStream> ends[2]; 281 }; 282 283 CapabilityPipe newCapabilityPipe(); 284 // Like newTwoWayPipe() but creates a capability pipe. 285 // 286 // The requirement of `writeWithStreams()` that "The stream implementations must be from the same 287 // AsyncIoProvider." does not apply to this pipe; any kind of AsyncCapabilityStream implementation 288 // is supported. 289 // 290 // This implementation does not know how to convert streams to FDs or vice versa; if you write FDs 291 // you must read FDs, and if you write streams you must read streams. 292 293 struct Tee { 294 // Two AsyncInputStreams which each read the same data from some wrapped inner AsyncInputStream. 295 296 Own<AsyncInputStream> branches[2]; 297 }; 298 299 Tee newTee(Own<AsyncInputStream> input, uint64_t limit = kj::maxValue); 300 // Constructs a Tee that operates in-process. The tee buffers data if any read or pump operations is 301 // called on one of the two input ends. If a read or pump operation is subsequently called on the 302 // other input end, the buffered data is consumed. 303 // 304 // `pumpTo()` operations on the input ends will proactively read from the inner stream and block 305 // while writing to the output stream. While one branch has an active `pumpTo()` operation, any 306 // `tryRead()` operation on the other branch will not be allowed to read faster than allowed by the 307 // pump's backpressure. (In other words, it will never cause buffering on the pump.) Similarly, if 308 // there are `pumpTo()` operations active on both branches, the greater of the two backpressures is 309 // respected -- the two pumps progress in lockstep, and there is no buffering. 310 // 311 // At no point will a branch's buffer be allowed to grow beyond `limit` bytes. If the buffer would 312 // grow beyond the limit, an exception is generated, which both branches see once they have 313 // exhausted their buffers. 314 // 315 // It is recommended that you use a more conservative value for `limit` than the default. 316 317 Own<AsyncOutputStream> newPromisedStream(Promise<Own<AsyncOutputStream>> promise); 318 Own<AsyncIoStream> newPromisedStream(Promise<Own<AsyncIoStream>> promise); 319 // Constructs an Async*Stream which waits for a promise to resolve, then forwards all calls to the 320 // promised stream. 321 322 // ======================================================================================= 323 // Authenticated streams 324 325 class PeerIdentity { 326 // PeerIdentity provides information about a connecting client. Various subclasses exist to 327 // address different network types. 328 public: 329 virtual kj::String toString() = 0; 330 // Returns a human-readable string identifying the peer. Where possible, this string will be 331 // in the same format as the addresses you could pass to `kj::Network::parseAddress()`. However, 332 // only certain subclasses of `PeerIdentity` guarantee this property. 333 }; 334 335 struct AuthenticatedStream { 336 // A pair of an `AsyncIoStream` and a `PeerIdentity`. This is used as the return type of 337 // `NetworkAddress::connectAuthenticated()` and `ConnectionReceiver::acceptAuthenticated()`. 338 339 Own<AsyncIoStream> stream; 340 // The byte stream. 341 342 Own<PeerIdentity> peerIdentity; 343 // An object indicating who is at the other end of the stream. 344 // 345 // Different subclasses of `PeerIdentity` are used in different situations: 346 // - TCP connections will use NetworkPeerIdentity, which gives the network address of the client. 347 // - Local (unix) socket connections will use LocalPeerIdentity, which identifies the UID 348 // and PID of the process that initiated the connection. 349 // - TLS connections will use TlsPeerIdentity which provides details of the client certificate, 350 // if any was provided. 351 // - When no meaningful peer identity can be provided, `UnknownPeerIdentity` is returned. 352 // 353 // Implementations of `Network`, `ConnectionReceiver`, `NetworkAddress`, etc. should document the 354 // specific assumptions the caller can make about the type of `PeerIdentity`s used, allowing for 355 // identities to be statically downcast if the right conditions are met. In the absence of 356 // documented promises, RTTI may be needed to query the type. 357 }; 358 359 class NetworkPeerIdentity: public PeerIdentity { 360 // PeerIdentity used for network protocols like TCP/IP. This identifies the remote peer. 361 // 362 // This is only "authenticated" to the extent that we know data written to the stream will be 363 // routed to the given address. This does not preclude the possibility of man-in-the-middle 364 // attacks by attackers who are able to manipulate traffic along the route. 365 public: 366 virtual NetworkAddress& getAddress() = 0; 367 // Obtain the peer's address as a NetworkAddress object. The returned reference's lifetime is the 368 // same as the `NetworkPeerIdentity`, but you can always call `clone()` on it to get a copy that 369 // lives longer. 370 371 static kj::Own<NetworkPeerIdentity> newInstance(kj::Own<NetworkAddress> addr); 372 // Construct an instance of this interface wrapping the given address. 373 }; 374 375 class LocalPeerIdentity: public PeerIdentity { 376 // PeerIdentity used for connections between processes on the local machine -- in particular, 377 // Unix sockets. 378 // 379 // (This interface probably isn't useful on Windows.) 380 public: 381 struct Credentials { 382 kj::Maybe<int> pid; 383 kj::Maybe<uint> uid; 384 385 // We don't cover groups at present because some systems produce a list of groups while others 386 // only provide the peer's main group, the latter being pretty useless. 387 }; 388 389 virtual Credentials getCredentials() = 0; 390 // Get the PID and UID of the peer process, if possible. 391 // 392 // Either ID may be null if the peer could not be identified. Some operating systems do not 393 // support retrieving these credentials, or can only provide one or the other. Some situations 394 // (like user and PID namespaces on Linux) may also make it impossible to represent the peer's 395 // credentials accurately. 396 // 397 // Note the meaning here can be subtle. Multiple processes can potentially have the socket in 398 // their file descriptor tables. The identified process is the one who called `connect()` or 399 // `listen()`. 400 // 401 // On Linux this is implemented with SO_PEERCRED. 402 403 static kj::Own<LocalPeerIdentity> newInstance(Credentials creds); 404 // Construct an instance of this interface wrapping the given credentials. 405 }; 406 407 class UnknownPeerIdentity: public PeerIdentity { 408 public: 409 static kj::Own<UnknownPeerIdentity> newInstance(); 410 // Get an instance of this interface. This actually always returns the same instance with no 411 // memory allocation. 412 }; 413 414 // ======================================================================================= 415 // Accepting connections 416 417 class ConnectionReceiver { 418 // Represents a server socket listening on a port. 419 420 public: 421 virtual Promise<Own<AsyncIoStream>> accept() = 0; 422 // Accept the next incoming connection. 423 424 virtual Promise<AuthenticatedStream> acceptAuthenticated(); 425 // Accept the next incoming connection, and also provide a PeerIdentity with any information 426 // about the client. 427 // 428 // For backwards-compatibility, the default implementation of this method calls `accept()` and 429 // then adds `UnknownPeerIdentity`. 430 431 virtual uint getPort() = 0; 432 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't 433 // specify a port when constructing the NetworkAddress -- one will have been assigned 434 // automatically. 435 436 virtual void getsockopt(int level, int option, void* value, uint* length); 437 virtual void setsockopt(int level, int option, const void* value, uint length); 438 virtual void getsockname(struct sockaddr* addr, uint* length); 439 // Same as the methods of AsyncIoStream. 440 }; 441 442 // ======================================================================================= 443 // Datagram I/O 444 445 class AncillaryMessage { 446 // Represents an ancillary message (aka control message) received using the recvmsg() system 447 // call (or equivalent). Most apps will not use this. 448 449 public: 450 inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data); 451 AncillaryMessage() = default; 452 453 inline int getLevel() const; 454 // Originating protocol / socket level. 455 456 inline int getType() const; 457 // Protocol-specific message type. 458 459 template <typename T> 460 inline Maybe<const T&> as() const; 461 // Interpret the ancillary message as the given struct type. Most ancillary messages are some 462 // sort of struct, so this is a convenient way to access it. Returns nullptr if the message 463 // is smaller than the struct -- this can happen if the message was truncated due to 464 // insufficient ancillary buffer space. 465 466 template <typename T> 467 inline ArrayPtr<const T> asArray() const; 468 // Interpret the ancillary message as an array of items. If the message size does not evenly 469 // divide into elements of type T, the remainder is discarded -- this can happen if the message 470 // was truncated due to insufficient ancillary buffer space. 471 472 private: 473 int level; 474 int type; 475 ArrayPtr<const byte> data; 476 // Message data. In most cases you should use `as()` or `asArray()`. 477 }; 478 479 class DatagramReceiver { 480 // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's 481 // capacity in advance; if a received packet is larger than the capacity, it will be truncated. 482 483 public: 484 virtual Promise<void> receive() = 0; 485 // Receive a new message, overwriting this object's content. 486 // 487 // receive() may reuse the same buffers for content and ancillary data with each call. 488 489 template <typename T> 490 struct MaybeTruncated { 491 T value; 492 493 bool isTruncated; 494 // True if the Receiver's capacity was insufficient to receive the value and therefore the 495 // value is truncated. 496 }; 497 498 virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0; 499 // Get the content of the datagram. 500 501 virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0; 502 // Ancillary messages received with the datagram. See the recvmsg() system call and the cmsghdr 503 // struct. Most apps don't need this. 504 // 505 // If the returned value is truncated, then the last message in the array may itself be 506 // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will 507 // return fewer elements than expected. Truncation can also mean that additional messages were 508 // available but discarded. 509 510 virtual NetworkAddress& getSource() = 0; 511 // Get the datagram sender's address. 512 513 struct Capacity { 514 size_t content = 8192; 515 // How much space to allocate for the datagram content. If a datagram is received that is 516 // larger than this, it will be truncated, with no way to recover the tail. 517 518 size_t ancillary = 0; 519 // How much space to allocate for ancillary messages. As with content, if the ancillary data 520 // is larger than this, it will be truncated. 521 }; 522 }; 523 524 class DatagramPort { 525 public: 526 virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0; 527 virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces, 528 NetworkAddress& destination) = 0; 529 530 virtual Own<DatagramReceiver> makeReceiver( 531 DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0; 532 // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much 533 // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`. 534 535 virtual uint getPort() = 0; 536 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't 537 // specify a port when constructing the NetworkAddress -- one will have been assigned 538 // automatically. 539 540 virtual void getsockopt(int level, int option, void* value, uint* length); 541 virtual void setsockopt(int level, int option, const void* value, uint length); 542 // Same as the methods of AsyncIoStream. 543 }; 544 545 // ======================================================================================= 546 // Networks 547 548 class NetworkAddress { 549 // Represents a remote address to which the application can connect. 550 551 public: 552 virtual Promise<Own<AsyncIoStream>> connect() = 0; 553 // Make a new connection to this address. 554 // 555 // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number. 556 557 virtual Promise<AuthenticatedStream> connectAuthenticated(); 558 // Connect to the address and return both the connection and information about the peer identity. 559 // This is especially useful when using TLS, to get certificate details. 560 // 561 // For backwards-compatibility, the default implementation of this method calls `connect()` and 562 // then uses a `NetworkPeerIdentity` wrapping a clone of this `NetworkAddress` -- which is not 563 // particularly useful. 564 565 virtual Own<ConnectionReceiver> listen() = 0; 566 // Listen for incoming connections on this address. 567 // 568 // The address must be local. 569 570 virtual Own<DatagramPort> bindDatagramPort(); 571 // Open this address as a datagram (e.g. UDP) port. 572 // 573 // The address must be local. 574 575 virtual Own<NetworkAddress> clone() = 0; 576 // Returns an equivalent copy of this NetworkAddress. 577 578 virtual String toString() = 0; 579 // Produce a human-readable string which hopefully can be passed to Network::parseAddress() 580 // to reproduce this address, although whether or not that works of course depends on the Network 581 // implementation. This should be called only to display the address to human users, who will 582 // hopefully know what they are able to do with it. 583 }; 584 585 class Network { 586 // Factory for NetworkAddress instances, representing the network services offered by the 587 // operating system. 588 // 589 // This interface typically represents broad authority, and well-designed code should limit its 590 // use to high-level startup code and user interaction. Low-level APIs should accept 591 // NetworkAddress instances directly and work from there, if at all possible. 592 593 public: 594 virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0; 595 // Construct a network address from a user-provided string. The format of the address 596 // strings is not specified at the API level, and application code should make no assumptions 597 // about them. These strings should always be provided by humans, and said humans will know 598 // what format to use in their particular context. 599 // 600 // `portHint`, if provided, specifies the "standard" IP port number for the application-level 601 // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a 602 // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is 603 // omitted, then the returned address will only support listen() and bindDatagramPort() 604 // (not connect()), and an unused port will be chosen each time one of those methods is called. 605 606 virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0; 607 // Construct a network address from a legacy struct sockaddr. 608 609 virtual Own<Network> restrictPeers( 610 kj::ArrayPtr<const kj::StringPtr> allow, 611 kj::ArrayPtr<const kj::StringPtr> deny = nullptr) KJ_WARN_UNUSED_RESULT = 0; 612 // Constructs a new Network instance wrapping this one which restricts which peer addresses are 613 // permitted (both for outgoing and incoming connections). 614 // 615 // Communication will be allowed only with peers whose addresses match one of the patterns 616 // specified in the `allow` array. If a `deny` array is specified, then any address which matches 617 // a pattern in `deny` and *does not* match any more-specific pattern in `allow` will also be 618 // denied. 619 // 620 // The syntax of address patterns depends on the network, except that three special patterns are 621 // defined for all networks: 622 // - "private": Matches network addresses that are reserved by standards for private networks, 623 // such as "10.0.0.0/8" or "192.168.0.0/16". This is a superset of "local". 624 // - "public": Opposite of "private". 625 // - "local": Matches network addresses that are defined by standards to only be accessible from 626 // the local machine, such as "127.0.0.0/8" or Unix domain addresses. 627 // - "network": Opposite of "local". 628 // 629 // For the standard KJ network implementation, the following patterns are also recognized: 630 // - Network blocks specified in CIDR notation (ipv4 and ipv6), such as "192.0.2.0/24" or 631 // "2001:db8::/32". 632 // - "unix" to match all Unix domain addresses. (In the future, we may support specifying a 633 // glob.) 634 // - "unix-abstract" to match Linux's "abstract unix domain" addresses. (In the future, we may 635 // support specifying a glob.) 636 // 637 // Network restrictions apply *after* DNS resolution (otherwise they'd be useless). 638 // 639 // It is legal to parseAddress() a restricted address. An exception won't be thrown until 640 // connect() is called. 641 // 642 // It's possible to listen() on a restricted address. However, connections will only be accepted 643 // from non-restricted addresses; others will be dropped. If a particular listen address has no 644 // valid peers (e.g. because it's a unix socket address and unix sockets are not allowed) then 645 // listen() may throw (or may simply never receive any connections). 646 // 647 // Examples: 648 // 649 // auto restricted = network->restrictPeers({"public"}); 650 // 651 // Allows connections only to/from public internet addresses. Use this when connecting to an 652 // address specified by a third party that is not trusted and is not themselves already on your 653 // private network. 654 // 655 // auto restricted = network->restrictPeers({"private"}); 656 // 657 // Allows connections only to/from the private network. Use this on the server side to reject 658 // connections from the public internet. 659 // 660 // auto restricted = network->restrictPeers({"192.0.2.0/24"}, {"192.0.2.3/32"}); 661 // 662 // Allows connections only to/from 192.0.2.*, except 192.0.2.3 which is blocked. 663 // 664 // auto restricted = network->restrictPeers({"10.0.0.0/8", "10.1.2.3/32"}, {"10.1.2.0/24"}); 665 // 666 // Allows connections to/from 10.*.*.*, with the exception of 10.1.2.* (which is denied), with an 667 // exception to the exception of 10.1.2.3 (which is allowed, because it is matched by an allow 668 // rule that is more specific than the deny rule). 669 }; 670 671 // ======================================================================================= 672 // I/O Provider 673 674 class AsyncIoProvider { 675 // Class which constructs asynchronous wrappers around the operating system's I/O facilities. 676 // 677 // Generally, the implementation of this interface must integrate closely with a particular 678 // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide 679 // an AsyncIoProvider. 680 681 public: 682 virtual OneWayPipe newOneWayPipe() = 0; 683 // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with 684 // the pipe(2) system call). 685 686 virtual TwoWayPipe newTwoWayPipe() = 0; 687 // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with 688 // socketpair(2) system call). Data written to one end can be read from the other. 689 690 virtual CapabilityPipe newCapabilityPipe(); 691 // Creates two AsyncCapabilityStreams representing the two ends of a two-way capability pipe. 692 // 693 // The default implementation throws an unimplemented exception. In particular this is not 694 // implemented by the default AsyncIoProvider on Windows, since Windows lacks any sane way to 695 // pass handles over a stream. 696 697 virtual Network& getNetwork() = 0; 698 // Creates a new `Network` instance representing the networks exposed by the operating system. 699 // 700 // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If 701 // you call this from low-level code, then you are preventing higher-level code from injecting an 702 // alternative implementation. Instead, if your code needs to use network functionality, it 703 // should ask for a `Network` as a constructor or method parameter, so that higher-level code can 704 // chose what implementation to use. The system network is essentially a singleton. See: 705 // http://www.object-oriented-security.org/lets-argue/singletons 706 // 707 // Code that uses the system network should not make any assumptions about what kinds of 708 // addresses it will parse, as this could differ across platforms. String addresses should come 709 // strictly from the user, who will know how to write them correctly for their system. 710 // 711 // With that said, KJ currently supports the following string address formats: 712 // - IPv4: "1.2.3.4", "1.2.3.4:80" 713 // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80" 714 // - Local IP wildcard (covers both v4 and v6): "*", "*:80" 715 // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http" 716 // - Unix domain: "unix:/path/to/socket" 717 718 struct PipeThread { 719 // A combination of a thread and a two-way pipe that communicates with that thread. 720 // 721 // The fields are intentionally ordered so that the pipe will be destroyed (and therefore 722 // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread 723 // arranges to exit when it detects disconnect, destruction should be clean. 724 725 Own<Thread> thread; 726 Own<AsyncIoStream> pipe; 727 }; 728 729 virtual PipeThread newPipeThread( 730 Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0; 731 // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate 732 // with it. One end of the pipe is passed to the thread's start function and the other end of 733 // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will 734 // already have an active `EventLoop` when `startFunc` is called. 735 // 736 // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too 737 // much at once but I'm not sure how to cleanly break it down. 738 739 virtual Timer& getTimer() = 0; 740 // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- 741 // it only updates when the event loop polls for system events. This means that calling `now()` 742 // on this timer does not require a system call. 743 // 744 // This timer is not affected by changes to the system date. It is unspecified whether the timer 745 // continues to count while the system is suspended. 746 }; 747 748 class LowLevelAsyncIoProvider { 749 // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on 750 // different operating systems. You should prefer to use `AsyncIoProvider` over this interface 751 // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection. 752 // 753 // On Unix, this interface can be used to import native file descriptors into the async framework. 754 // Different implementations of this interface might work on top of different event handling 755 // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library. 756 // 757 // On Windows, this interface can be used to import native SOCKETs into the async framework. 758 // Different implementations of this interface might work on top of different event handling 759 // primitives, such as I/O completion ports vs. completion routines. 760 761 public: 762 enum Flags { 763 // Flags controlling how to wrap a file descriptor. 764 765 TAKE_OWNERSHIP = 1 << 0, 766 // The returned object should own the file descriptor, automatically closing it when destroyed. 767 // The close-on-exec flag will be set on the descriptor if it is not already. 768 // 769 // If this flag is not used, then the file descriptor is not automatically closed and the 770 // close-on-exec flag is not modified. 771 772 #if !_WIN32 773 ALREADY_CLOEXEC = 1 << 1, 774 // Indicates that the close-on-exec flag is known already to be set, so need not be set again. 775 // Only relevant when combined with TAKE_OWNERSHIP. 776 // 777 // On Linux, all system calls which yield new file descriptors have flags or variants which 778 // set the close-on-exec flag immediately. Unfortunately, other OS's do not. 779 780 ALREADY_NONBLOCK = 1 << 2 781 // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag 782 // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode 783 // automatically. 784 // 785 // On Linux, all system calls which yield new file descriptors have flags or variants which 786 // enable non-blocking mode immediately. Unfortunately, other OS's do not. 787 #endif 788 }; 789 790 #if _WIN32 791 typedef uintptr_t Fd; 792 typedef AutoCloseHandle OwnFd; 793 // On Windows, the `fd` parameter to each of these methods must be a SOCKET, and must have the 794 // flag WSA_FLAG_OVERLAPPED (which socket() uses by default, but WSASocket() wants you to specify 795 // explicitly). 796 #else 797 typedef int Fd; 798 typedef AutoCloseFd OwnFd; 799 // On Unix, any arbitrary file descriptor is supported. 800 #endif 801 802 virtual Own<AsyncInputStream> wrapInputFd(Fd fd, uint flags = 0) = 0; 803 // Create an AsyncInputStream wrapping a file descriptor. 804 // 805 // `flags` is a bitwise-OR of the values of the `Flags` enum. 806 807 virtual Own<AsyncOutputStream> wrapOutputFd(Fd fd, uint flags = 0) = 0; 808 // Create an AsyncOutputStream wrapping a file descriptor. 809 // 810 // `flags` is a bitwise-OR of the values of the `Flags` enum. 811 812 virtual Own<AsyncIoStream> wrapSocketFd(Fd fd, uint flags = 0) = 0; 813 // Create an AsyncIoStream wrapping a socket file descriptor. 814 // 815 // `flags` is a bitwise-OR of the values of the `Flags` enum. 816 817 #if !_WIN32 818 virtual Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0); 819 // Like wrapSocketFd() but also support capability passing via SCM_RIGHTS. The socket must be 820 // a Unix domain socket. 821 // 822 // The default implementation throws UNIMPLEMENTED, for backwards-compatibility with 823 // LowLevelAsyncIoProvider implementations written before this method was added. 824 #endif 825 826 virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd( 827 Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0; 828 // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address. 829 // The returned promise does not resolve until connection has completed. 830 // 831 // `flags` is a bitwise-OR of the values of the `Flags` enum. 832 833 class NetworkFilter { 834 public: 835 virtual bool shouldAllow(const struct sockaddr* addr, uint addrlen) = 0; 836 // Returns true if incoming connections or datagrams from the given peer should be accepted. 837 // If false, they will be dropped. This is used to implement kj::Network::restrictPeers(). 838 839 static NetworkFilter& getAllAllowed(); 840 }; 841 842 virtual Own<ConnectionReceiver> wrapListenSocketFd( 843 Fd fd, NetworkFilter& filter, uint flags = 0) = 0; 844 inline Own<ConnectionReceiver> wrapListenSocketFd(Fd fd, uint flags = 0) { 845 return wrapListenSocketFd(fd, NetworkFilter::getAllAllowed(), flags); 846 } 847 // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already 848 // have had `bind()` and `listen()` called on it, so it's ready for `accept()`. 849 // 850 // `flags` is a bitwise-OR of the values of the `Flags` enum. 851 852 virtual Own<DatagramPort> wrapDatagramSocketFd(Fd fd, NetworkFilter& filter, uint flags = 0); 853 inline Own<DatagramPort> wrapDatagramSocketFd(Fd fd, uint flags = 0) { 854 return wrapDatagramSocketFd(fd, NetworkFilter::getAllAllowed(), flags); 855 } 856 857 virtual Timer& getTimer() = 0; 858 // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- 859 // it only updates when the event loop polls for system events. This means that calling `now()` 860 // on this timer does not require a system call. 861 // 862 // This timer is not affected by changes to the system date. It is unspecified whether the timer 863 // continues to count while the system is suspended. 864 865 Own<AsyncInputStream> wrapInputFd(OwnFd&& fd, uint flags = 0); 866 Own<AsyncOutputStream> wrapOutputFd(OwnFd&& fd, uint flags = 0); 867 Own<AsyncIoStream> wrapSocketFd(OwnFd&& fd, uint flags = 0); 868 #if !_WIN32 869 Own<AsyncCapabilityStream> wrapUnixSocketFd(OwnFd&& fd, uint flags = 0); 870 #endif 871 Promise<Own<AsyncIoStream>> wrapConnectingSocketFd( 872 OwnFd&& fd, const struct sockaddr* addr, uint addrlen, uint flags = 0); 873 Own<ConnectionReceiver> wrapListenSocketFd( 874 OwnFd&& fd, NetworkFilter& filter, uint flags = 0); 875 Own<ConnectionReceiver> wrapListenSocketFd(OwnFd&& fd, uint flags = 0); 876 Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, NetworkFilter& filter, uint flags = 0); 877 Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, uint flags = 0); 878 // Convenience wrappers which transfer ownership via AutoCloseFd (Unix) or AutoCloseHandle 879 // (Windows). TAKE_OWNERSHIP will be implicitly added to `flags`. 880 }; 881 882 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel); 883 // Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`. 884 885 struct AsyncIoContext { 886 Own<LowLevelAsyncIoProvider> lowLevelProvider; 887 Own<AsyncIoProvider> provider; 888 WaitScope& waitScope; 889 890 #if _WIN32 891 Win32EventPort& win32EventPort; 892 #else 893 UnixEventPort& unixEventPort; 894 // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This 895 // field will go away at some point when we have a chance to improve these interfaces. 896 #endif 897 }; 898 899 AsyncIoContext setupAsyncIo(); 900 // Convenience method which sets up the current thread with everything it needs to do async I/O. 901 // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for 902 // doing I/O on the host system, so everything is ready for the thread to start making async calls 903 // and waiting on promises. 904 // 905 // You would typically call this in your main() loop or in the start function of a thread. 906 // Example: 907 // 908 // int main() { 909 // auto ioContext = kj::setupAsyncIo(); 910 // 911 // // Now we can call an async function. 912 // Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com"); 913 // 914 // // And we can wait for the promise to complete. Note that you can only use `wait()` 915 // // from the top level, not from inside a promise callback. 916 // String text = textPromise.wait(ioContext.waitScope); 917 // print(text); 918 // return 0; 919 // } 920 // 921 // WARNING: An AsyncIoContext can only be used in the thread and process that created it. In 922 // particular, note that after a fork(), an AsyncIoContext created in the parent process will 923 // not work correctly in the child, even if the parent ceases to use its copy. In particular 924 // note that this means that server processes which daemonize themselves at startup must wait 925 // until after daemonization to create an AsyncIoContext. 926 927 // ======================================================================================= 928 // Convenience adapters. 929 930 class CapabilityStreamConnectionReceiver final: public ConnectionReceiver { 931 // Trivial wrapper which allows an AsyncCapabilityStream to act as a ConnectionReceiver. accept() 932 // calls receiveStream(). 933 934 public: 935 CapabilityStreamConnectionReceiver(AsyncCapabilityStream& inner) 936 : inner(inner) {} 937 938 Promise<Own<AsyncIoStream>> accept() override; 939 uint getPort() override; 940 941 Promise<AuthenticatedStream> acceptAuthenticated() override; 942 // Always produces UnknownIdentity. Capability-based security patterns should not rely on 943 // authenticating peers; the other end of the capability stream should only be given to 944 // authorized parties in the first place. 945 946 private: 947 AsyncCapabilityStream& inner; 948 }; 949 950 class CapabilityStreamNetworkAddress final: public NetworkAddress { 951 // Trivial wrapper which allows an AsyncCapabilityStream to act as a NetworkAddress. 952 // 953 // connect() is implemented by calling provider.newCapabilityPipe(), sending one end over the 954 // original capability stream, and returning the other end. If `provider` is null, then the 955 // global kj::newCapabilityPipe() will be used, but this ONLY works if `inner` itself is agnostic 956 // to the type of streams it receives, e.g. because it was also created using 957 // kj::NewCapabilityPipe(). 958 // 959 // listen().accept() is implemented by receiving new streams over the original stream. 960 // 961 // Note that clone() doesn't work (due to ownership issues) and toString() returns a static 962 // string. 963 964 public: 965 CapabilityStreamNetworkAddress(kj::Maybe<AsyncIoProvider&> provider, AsyncCapabilityStream& inner) 966 : provider(provider), inner(inner) {} 967 968 Promise<Own<AsyncIoStream>> connect() override; 969 Own<ConnectionReceiver> listen() override; 970 971 Own<NetworkAddress> clone() override; 972 String toString() override; 973 974 Promise<AuthenticatedStream> connectAuthenticated() override; 975 // Always produces UnknownIdentity. Capability-based security patterns should not rely on 976 // authenticating peers; the other end of the capability stream should only be given to 977 // authorized parties in the first place. 978 979 private: 980 kj::Maybe<AsyncIoProvider&> provider; 981 AsyncCapabilityStream& inner; 982 }; 983 984 // ======================================================================================= 985 // inline implementation details 986 987 inline AncillaryMessage::AncillaryMessage( 988 int level, int type, ArrayPtr<const byte> data) 989 : level(level), type(type), data(data) {} 990 991 inline int AncillaryMessage::getLevel() const { return level; } 992 inline int AncillaryMessage::getType() const { return type; } 993 994 template <typename T> 995 inline Maybe<const T&> AncillaryMessage::as() const { 996 if (data.size() >= sizeof(T)) { 997 return *reinterpret_cast<const T*>(data.begin()); 998 } else { 999 return nullptr; 1000 } 1001 } 1002 1003 template <typename T> 1004 inline ArrayPtr<const T> AncillaryMessage::asArray() const { 1005 return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T)); 1006 } 1007 1008 } // namespace kj 1009 1010 KJ_END_HEADER