capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-io.h (47860B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #pragma once
     23 
     24 #include "async.h"
     25 #include "function.h"
     26 #include "thread.h"
     27 #include "timer.h"
     28 
     29 KJ_BEGIN_HEADER
     30 
     31 struct sockaddr;
     32 
     33 namespace kj {
     34 
     35 #if _WIN32
     36 class Win32EventPort;
     37 class AutoCloseHandle;
     38 #else
     39 class UnixEventPort;
     40 #endif
     41 
     42 class AutoCloseFd;
     43 class NetworkAddress;
     44 class AsyncOutputStream;
     45 class AsyncIoStream;
     46 class AncillaryMessage;
     47 
     48 // =======================================================================================
     49 // Streaming I/O
     50 
     51 class AsyncInputStream {
     52   // Asynchronous equivalent of InputStream (from io.h).
     53 
     54 public:
     55   virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes);
     56   virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0;
     57 
     58   Promise<void> read(void* buffer, size_t bytes);
     59 
     60   virtual Maybe<uint64_t> tryGetLength();
     61   // Get the remaining number of bytes that will be produced by this stream, if known.
     62   //
     63   // This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the
     64   // HTTP implementation may need to fall back to Transfer-Encoding: chunked.
     65   //
     66   // The default implementation always returns null.
     67 
     68   virtual Promise<uint64_t> pumpTo(
     69       AsyncOutputStream& output, uint64_t amount = kj::maxValue);
     70   // Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the
     71   // total bytes actually pumped (which is only less than `amount` if EOF was reached).
     72   //
     73   // Override this if your stream type knows how to pump itself to certain kinds of output
     74   // streams more efficiently than via the naive approach. You can use
     75   // kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match,
     76   // delegate to the default implementation.
     77   //
     78   // The default implementation first tries calling output.tryPumpFrom(), but if that fails, it
     79   // performs a naive pump by allocating a buffer and reading to it / writing from it in a loop.
     80 
     81   Promise<Array<byte>> readAllBytes(uint64_t limit = kj::maxValue);
     82   Promise<String> readAllText(uint64_t limit = kj::maxValue);
     83   // Read until EOF and return as one big byte array or string. Throw an exception if EOF is not
     84   // seen before reading `limit` bytes.
     85   //
     86   // To prevent runaway memory allocation, consider using a more conservative value for `limit` than
     87   // the default, particularly on untrusted data streams which may never see EOF.
     88 
     89   virtual void registerAncillaryMessageHandler(Function<void(ArrayPtr<AncillaryMessage>)> fn);
     90   // Register interest in checking for ancillary messages (aka control messages) when reading.
     91   // The provided callback will be called whenever any are encountered. The messages passed to
     92   // the function do not live beyond when function returns.
     93   // Only supported on Unix (the default impl throws UNIMPLEMENTED). Most apps will not use this.
     94 
     95   virtual Maybe<Own<AsyncInputStream>> tryTee(uint64_t limit = kj::maxValue);
     96   // Primarily intended as an optimization for the `tee` call. Returns an input stream whose state
     97   // is independent from this one but which will return the exact same set of bytes read going
     98   // forward. limit is a total limit on the amount of memory, in bytes, which a tee implementation
     99   // may use to buffer stream data. An implementation must throw an exception if a read operation
    100   // would cause the limit to be exceeded. If tryTee() can see that the new limit is impossible to
    101   // satisfy, it should return nullptr so that the pessimized path is taken in newTee. This is
    102   // likely to arise if tryTee() is called twice with different limits on the same stream.
    103 };
    104 
    105 class AsyncOutputStream {
    106   // Asynchronous equivalent of OutputStream (from io.h).
    107 
    108 public:
    109   virtual Promise<void> write(const void* buffer, size_t size) KJ_WARN_UNUSED_RESULT = 0;
    110   virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces)
    111       KJ_WARN_UNUSED_RESULT = 0;
    112 
    113   virtual Maybe<Promise<uint64_t>> tryPumpFrom(
    114       AsyncInputStream& input, uint64_t amount = kj::maxValue);
    115   // Implements double-dispatch for AsyncInputStream::pumpTo().
    116   //
    117   // This method should only be called from within an implementation of pumpTo().
    118   //
    119   // This method examines the type of `input` to find optimized ways to pump data from it to this
    120   // output stream. If it finds one, it performs the pump. Otherwise, it returns null.
    121   //
    122   // The default implementation always returns null.
    123 
    124   virtual Promise<void> whenWriteDisconnected() = 0;
    125   // Returns a promise that resolves when the stream has become disconnected such that new write()s
    126   // will fail with a DISCONNECTED exception. This is particularly useful, for example, to cancel
    127   // work early when it is detected that no one will receive the result.
    128   //
    129   // Note that not all streams are able to detect this condition without actually performing a
    130   // write(); such stream implementations may return a promise that never resolves. (In particular,
    131   // as of this writing, whenWriteDisconnected() is not implemented on Windows. Also, for TCP
    132   // streams, not all disconnects are detectable -- a power or network failure may lead the
    133   // connection to hang forever, or until configured socket options lead to a timeout.)
    134   //
    135   // Unlike most other asynchronous stream methods, it is safe to call whenWriteDisconnected()
    136   // multiple times without canceling the previous promises.
    137 };
    138 
    139 class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
    140   // A combination input and output stream.
    141 
    142 public:
    143   virtual void shutdownWrite() = 0;
    144   // Cleanly shut down just the write end of the stream, while keeping the read end open.
    145 
    146   virtual void abortRead() {}
    147   // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only
    148   // be called when an error has occurred.
    149 
    150   virtual void getsockopt(int level, int option, void* value, uint* length);
    151   virtual void setsockopt(int level, int option, const void* value, uint length);
    152   // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception
    153   // if the stream is not a socket or the option is not appropriate for the socket type. The
    154   // default implementations always throw "unimplemented".
    155 
    156   virtual void getsockname(struct sockaddr* addr, uint* length);
    157   virtual void getpeername(struct sockaddr* addr, uint* length);
    158   // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented"
    159   // exception if the stream is not a socket. The default implementations always throw
    160   // "unimplemented".
    161   //
    162   // Note that we don't provide methods that return NetworkAddress because it usually wouldn't
    163   // be useful. You can't connect() to or listen() on these addresses, obviously, because they are
    164   // ephemeral addresses for a single connection.
    165 
    166   virtual kj::Maybe<int> getFd() const { return nullptr; }
    167   // Get the underlying Unix file descriptor, if any. Returns nullptr if this object actually
    168   // isn't wrapping a file descriptor.
    169 };
    170 
    171 class AsyncCapabilityStream: public AsyncIoStream {
    172   // An AsyncIoStream that also allows transmitting new stream objects and file descriptors
    173   // (capabilities, in the object-capability model sense), in addition to bytes.
    174   //
    175   // Capabilities can be attached to bytes when they are written. On the receiving end, the read()
    176   // that receives the first byte of such a message will also receive the capabilities.
    177   //
    178   // Note that AsyncIoStream's regular byte-oriented methods can be used on AsyncCapabilityStream,
    179   // with the effect of silently dropping any capabilities attached to the respective bytes. E.g.
    180   // using `AsyncIoStream::tryRead()` to read bytes that had been sent with `writeWithFds()` will
    181   // silently drop the FDs (closing them if appropriate). Also note that pumping a stream with
    182   // `pumpTo()` always drops all capabilities attached to the pumped data. (TODO(someday): Do we
    183   // want a version of pumpTo() that preserves capabilities?)
    184   //
    185   // On Unix, KJ provides an implementation based on Unix domain sockets and file descriptor
    186   // passing via SCM_RIGHTS. Due to the nature of SCM_RIGHTS, if the application accidentally
    187   // read()s when it should have called receiveStream(), it will observe a NUL byte in the data
    188   // and the capability will be discarded. Of course, an application should not depend on this
    189   // behavior; it should avoid read()ing through a capability.
    190   //
    191   // KJ does not provide any inter-process implementation of this type on Windows, as there's no
    192   // obvious implementation there. Handle passing on Windows requires at least one of the processes
    193   // involved to have permission to modify the other's handle table, which is effectively full
    194   // control. Handle passing between mutually non-trusting processes would require a trusted
    195   // broker process to facilitate. One could possibly implement this type in terms of such a
    196   // broker, or in terms of direct handle passing if at least one process trusts the other.
    197 
    198 public:
    199   virtual Promise<void> writeWithFds(ArrayPtr<const byte> data,
    200                                      ArrayPtr<const ArrayPtr<const byte>> moreData,
    201                                      ArrayPtr<const int> fds) = 0;
    202   Promise<void> writeWithFds(ArrayPtr<const byte> data,
    203                              ArrayPtr<const ArrayPtr<const byte>> moreData,
    204                              ArrayPtr<const AutoCloseFd> fds);
    205   // Write some data to the stream with some file descriptors attached to it.
    206   //
    207   // The maximum number of FDs that can be sent at a time is usually subject to an OS-imposed
    208   // limit. On Linux, this is 253. In practice, sending more than a handful of FDs at once is
    209   // probably a bad idea.
    210 
    211   struct ReadResult {
    212     size_t byteCount;
    213     size_t capCount;
    214   };
    215 
    216   virtual Promise<ReadResult> tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes,
    217                                              AutoCloseFd* fdBuffer, size_t maxFds) = 0;
    218   // Read data from the stream that may have file descriptors attached. Any attached descriptors
    219   // will be placed in `fdBuffer`. If multiple bundles of FDs are encountered in the course of
    220   // reading the amount of data requested by minBytes/maxBytes, then they will be concatenated. If
    221   // more FDs are received than fit in the buffer, then the excess will be discarded and closed --
    222   // this behavior, while ugly, is important to defend against denial-of-service attacks that may
    223   // fill up the FD table with garbage. Applications must think carefully about how many FDs they
    224   // really need to receive at once and set a well-defined limit.
    225 
    226   virtual Promise<void> writeWithStreams(ArrayPtr<const byte> data,
    227                                          ArrayPtr<const ArrayPtr<const byte>> moreData,
    228                                          Array<Own<AsyncCapabilityStream>> streams) = 0;
    229   virtual Promise<ReadResult> tryReadWithStreams(
    230       void* buffer, size_t minBytes, size_t maxBytes,
    231       Own<AsyncCapabilityStream>* streamBuffer, size_t maxStreams) = 0;
    232   // Like above, but passes AsyncCapabilityStream objects. The stream implementations must be from
    233   // the same AsyncIoProvider.
    234 
    235   // ---------------------------------------------------------------------------
    236   // Helpers for sending individual capabilities.
    237   //
    238   // These are equivalent to the above methods with the constraint that only one FD is
    239   // sent/received at a time and the corresponding data is a single zero-valued byte.
    240 
    241   Promise<Own<AsyncCapabilityStream>> receiveStream();
    242   Promise<Maybe<Own<AsyncCapabilityStream>>> tryReceiveStream();
    243   Promise<void> sendStream(Own<AsyncCapabilityStream> stream);
    244   // Transfer a single stream.
    245 
    246   Promise<AutoCloseFd> receiveFd();
    247   Promise<Maybe<AutoCloseFd>> tryReceiveFd();
    248   Promise<void> sendFd(int fd);
    249   // Transfer a single raw file descriptor.
    250 };
    251 
    252 struct OneWayPipe {
    253   // A data pipe with an input end and an output end.  (Typically backed by pipe() system call.)
    254 
    255   Own<AsyncInputStream> in;
    256   Own<AsyncOutputStream> out;
    257 };
    258 
    259 OneWayPipe newOneWayPipe(kj::Maybe<uint64_t> expectedLength = nullptr);
    260 // Constructs a OneWayPipe that operates in-process. The pipe does not do any buffering -- it waits
    261 // until both a read() and a write() call are pending, then resolves both.
    262 //
    263 // If `expectedLength` is non-null, then the pipe will be expected to transmit exactly that many
    264 // bytes. The input end's `tryGetLength()` will return the number of bytes left.
    265 
    266 struct TwoWayPipe {
    267   // A data pipe that supports sending in both directions.  Each end's output sends data to the
    268   // other end's input.  (Typically backed by socketpair() system call.)
    269 
    270   Own<AsyncIoStream> ends[2];
    271 };
    272 
    273 TwoWayPipe newTwoWayPipe();
    274 // Constructs a TwoWayPipe that operates in-process. The pipe does not do any buffering -- it waits
    275 // until both a read() and a write() call are pending, then resolves both.
    276 
    277 struct CapabilityPipe {
    278   // Like TwoWayPipe but allowing capability-passing.
    279 
    280   Own<AsyncCapabilityStream> ends[2];
    281 };
    282 
    283 CapabilityPipe newCapabilityPipe();
    284 // Like newTwoWayPipe() but creates a capability pipe.
    285 //
    286 // The requirement of `writeWithStreams()` that "The stream implementations must be from the same
    287 // AsyncIoProvider." does not apply to this pipe; any kind of AsyncCapabilityStream implementation
    288 // is supported.
    289 //
    290 // This implementation does not know how to convert streams to FDs or vice versa; if you write FDs
    291 // you must read FDs, and if you write streams you must read streams.
    292 
    293 struct Tee {
    294   // Two AsyncInputStreams which each read the same data from some wrapped inner AsyncInputStream.
    295 
    296   Own<AsyncInputStream> branches[2];
    297 };
    298 
    299 Tee newTee(Own<AsyncInputStream> input, uint64_t limit = kj::maxValue);
    300 // Constructs a Tee that operates in-process. The tee buffers data if any read or pump operations is
    301 // called on one of the two input ends. If a read or pump operation is subsequently called on the
    302 // other input end, the buffered data is consumed.
    303 //
    304 // `pumpTo()` operations on the input ends will proactively read from the inner stream and block
    305 // while writing to the output stream. While one branch has an active `pumpTo()` operation, any
    306 // `tryRead()` operation on the other branch will not be allowed to read faster than allowed by the
    307 // pump's backpressure. (In other words, it will never cause buffering on the pump.) Similarly, if
    308 // there are `pumpTo()` operations active on both branches, the greater of the two backpressures is
    309 // respected -- the two pumps progress in lockstep, and there is no buffering.
    310 //
    311 // At no point will a branch's buffer be allowed to grow beyond `limit` bytes. If the buffer would
    312 // grow beyond the limit, an exception is generated, which both branches see once they have
    313 // exhausted their buffers.
    314 //
    315 // It is recommended that you use a more conservative value for `limit` than the default.
    316 
    317 Own<AsyncOutputStream> newPromisedStream(Promise<Own<AsyncOutputStream>> promise);
    318 Own<AsyncIoStream> newPromisedStream(Promise<Own<AsyncIoStream>> promise);
    319 // Constructs an Async*Stream which waits for a promise to resolve, then forwards all calls to the
    320 // promised stream.
    321 
    322 // =======================================================================================
    323 // Authenticated streams
    324 
    325 class PeerIdentity {
    326   // PeerIdentity provides information about a connecting client. Various subclasses exist to
    327   // address different network types.
    328 public:
    329   virtual kj::String toString() = 0;
    330   // Returns a human-readable string identifying the peer. Where possible, this string will be
    331   // in the same format as the addresses you could pass to `kj::Network::parseAddress()`. However,
    332   // only certain subclasses of `PeerIdentity` guarantee this property.
    333 };
    334 
    335 struct AuthenticatedStream {
    336   // A pair of an `AsyncIoStream` and a `PeerIdentity`. This is used as the return type of
    337   // `NetworkAddress::connectAuthenticated()` and `ConnectionReceiver::acceptAuthenticated()`.
    338 
    339   Own<AsyncIoStream> stream;
    340   // The byte stream.
    341 
    342   Own<PeerIdentity> peerIdentity;
    343   // An object indicating who is at the other end of the stream.
    344   //
    345   // Different subclasses of `PeerIdentity` are used in different situations:
    346   // - TCP connections will use NetworkPeerIdentity, which gives the network address of the client.
    347   // - Local (unix) socket connections will use LocalPeerIdentity, which identifies the UID
    348   //   and PID of the process that initiated the connection.
    349   // - TLS connections will use TlsPeerIdentity which provides details of the client certificate,
    350   //   if any was provided.
    351   // - When no meaningful peer identity can be provided, `UnknownPeerIdentity` is returned.
    352   //
    353   // Implementations of `Network`, `ConnectionReceiver`, `NetworkAddress`, etc. should document the
    354   // specific assumptions the caller can make about the type of `PeerIdentity`s used, allowing for
    355   // identities to be statically downcast if the right conditions are met. In the absence of
    356   // documented promises, RTTI may be needed to query the type.
    357 };
    358 
    359 class NetworkPeerIdentity: public PeerIdentity {
    360   // PeerIdentity used for network protocols like TCP/IP. This identifies the remote peer.
    361   //
    362   // This is only "authenticated" to the extent that we know data written to the stream will be
    363   // routed to the given address. This does not preclude the possibility of man-in-the-middle
    364   // attacks by attackers who are able to manipulate traffic along the route.
    365 public:
    366   virtual NetworkAddress& getAddress() = 0;
    367   // Obtain the peer's address as a NetworkAddress object. The returned reference's lifetime is the
    368   // same as the `NetworkPeerIdentity`, but you can always call `clone()` on it to get a copy that
    369   // lives longer.
    370 
    371   static kj::Own<NetworkPeerIdentity> newInstance(kj::Own<NetworkAddress> addr);
    372   // Construct an instance of this interface wrapping the given address.
    373 };
    374 
    375 class LocalPeerIdentity: public PeerIdentity {
    376   // PeerIdentity used for connections between processes on the local machine -- in particular,
    377   // Unix sockets.
    378   //
    379   // (This interface probably isn't useful on Windows.)
    380 public:
    381   struct Credentials {
    382     kj::Maybe<int> pid;
    383     kj::Maybe<uint> uid;
    384 
    385     // We don't cover groups at present because some systems produce a list of groups while others
    386     // only provide the peer's main group, the latter being pretty useless.
    387   };
    388 
    389   virtual Credentials getCredentials() = 0;
    390   // Get the PID and UID of the peer process, if possible.
    391   //
    392   // Either ID may be null if the peer could not be identified. Some operating systems do not
    393   // support retrieving these credentials, or can only provide one or the other. Some situations
    394   // (like user and PID namespaces on Linux) may also make it impossible to represent the peer's
    395   // credentials accurately.
    396   //
    397   // Note the meaning here can be subtle. Multiple processes can potentially have the socket in
    398   // their file descriptor tables. The identified process is the one who called `connect()` or
    399   // `listen()`.
    400   //
    401   // On Linux this is implemented with SO_PEERCRED.
    402 
    403   static kj::Own<LocalPeerIdentity> newInstance(Credentials creds);
    404   // Construct an instance of this interface wrapping the given credentials.
    405 };
    406 
    407 class UnknownPeerIdentity: public PeerIdentity {
    408 public:
    409   static kj::Own<UnknownPeerIdentity> newInstance();
    410   // Get an instance of this interface. This actually always returns the same instance with no
    411   // memory allocation.
    412 };
    413 
    414 // =======================================================================================
    415 // Accepting connections
    416 
    417 class ConnectionReceiver {
    418   // Represents a server socket listening on a port.
    419 
    420 public:
    421   virtual Promise<Own<AsyncIoStream>> accept() = 0;
    422   // Accept the next incoming connection.
    423 
    424   virtual Promise<AuthenticatedStream> acceptAuthenticated();
    425   // Accept the next incoming connection, and also provide a PeerIdentity with any information
    426   // about the client.
    427   //
    428   // For backwards-compatibility, the default implementation of this method calls `accept()` and
    429   // then adds `UnknownPeerIdentity`.
    430 
    431   virtual uint getPort() = 0;
    432   // Gets the port number, if applicable (i.e. if listening on IP).  This is useful if you didn't
    433   // specify a port when constructing the NetworkAddress -- one will have been assigned
    434   // automatically.
    435 
    436   virtual void getsockopt(int level, int option, void* value, uint* length);
    437   virtual void setsockopt(int level, int option, const void* value, uint length);
    438   virtual void getsockname(struct sockaddr* addr, uint* length);
    439   // Same as the methods of AsyncIoStream.
    440 };
    441 
    442 // =======================================================================================
    443 // Datagram I/O
    444 
    445 class AncillaryMessage {
    446   // Represents an ancillary message (aka control message) received using the recvmsg() system
    447   // call (or equivalent). Most apps will not use this.
    448 
    449 public:
    450   inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data);
    451   AncillaryMessage() = default;
    452 
    453   inline int getLevel() const;
    454   // Originating protocol / socket level.
    455 
    456   inline int getType() const;
    457   // Protocol-specific message type.
    458 
    459   template <typename T>
    460   inline Maybe<const T&> as() const;
    461   // Interpret the ancillary message as the given struct type. Most ancillary messages are some
    462   // sort of struct, so this is a convenient way to access it. Returns nullptr if the message
    463   // is smaller than the struct -- this can happen if the message was truncated due to
    464   // insufficient ancillary buffer space.
    465 
    466   template <typename T>
    467   inline ArrayPtr<const T> asArray() const;
    468   // Interpret the ancillary message as an array of items. If the message size does not evenly
    469   // divide into elements of type T, the remainder is discarded -- this can happen if the message
    470   // was truncated due to insufficient ancillary buffer space.
    471 
    472 private:
    473   int level;
    474   int type;
    475   ArrayPtr<const byte> data;
    476   // Message data. In most cases you should use `as()` or `asArray()`.
    477 };
    478 
    479 class DatagramReceiver {
    480   // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's
    481   // capacity in advance; if a received packet is larger than the capacity, it will be truncated.
    482 
    483 public:
    484   virtual Promise<void> receive() = 0;
    485   // Receive a new message, overwriting this object's content.
    486   //
    487   // receive() may reuse the same buffers for content and ancillary data with each call.
    488 
    489   template <typename T>
    490   struct MaybeTruncated {
    491     T value;
    492 
    493     bool isTruncated;
    494     // True if the Receiver's capacity was insufficient to receive the value and therefore the
    495     // value is truncated.
    496   };
    497 
    498   virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0;
    499   // Get the content of the datagram.
    500 
    501   virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0;
    502   // Ancillary messages received with the datagram. See the recvmsg() system call and the cmsghdr
    503   // struct. Most apps don't need this.
    504   //
    505   // If the returned value is truncated, then the last message in the array may itself be
    506   // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will
    507   // return fewer elements than expected. Truncation can also mean that additional messages were
    508   // available but discarded.
    509 
    510   virtual NetworkAddress& getSource() = 0;
    511   // Get the datagram sender's address.
    512 
    513   struct Capacity {
    514     size_t content = 8192;
    515     // How much space to allocate for the datagram content. If a datagram is received that is
    516     // larger than this, it will be truncated, with no way to recover the tail.
    517 
    518     size_t ancillary = 0;
    519     // How much space to allocate for ancillary messages. As with content, if the ancillary data
    520     // is larger than this, it will be truncated.
    521   };
    522 };
    523 
    524 class DatagramPort {
    525 public:
    526   virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0;
    527   virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces,
    528                                NetworkAddress& destination) = 0;
    529 
    530   virtual Own<DatagramReceiver> makeReceiver(
    531       DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0;
    532   // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much
    533   // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`.
    534 
    535   virtual uint getPort() = 0;
    536   // Gets the port number, if applicable (i.e. if listening on IP).  This is useful if you didn't
    537   // specify a port when constructing the NetworkAddress -- one will have been assigned
    538   // automatically.
    539 
    540   virtual void getsockopt(int level, int option, void* value, uint* length);
    541   virtual void setsockopt(int level, int option, const void* value, uint length);
    542   // Same as the methods of AsyncIoStream.
    543 };
    544 
    545 // =======================================================================================
    546 // Networks
    547 
    548 class NetworkAddress {
    549   // Represents a remote address to which the application can connect.
    550 
    551 public:
    552   virtual Promise<Own<AsyncIoStream>> connect() = 0;
    553   // Make a new connection to this address.
    554   //
    555   // The address must not be a wildcard ("*").  If it is an IP address, it must have a port number.
    556 
    557   virtual Promise<AuthenticatedStream> connectAuthenticated();
    558   // Connect to the address and return both the connection and information about the peer identity.
    559   // This is especially useful when using TLS, to get certificate details.
    560   //
    561   // For backwards-compatibility, the default implementation of this method calls `connect()` and
    562   // then uses a `NetworkPeerIdentity` wrapping a clone of this `NetworkAddress` -- which is not
    563   // particularly useful.
    564 
    565   virtual Own<ConnectionReceiver> listen() = 0;
    566   // Listen for incoming connections on this address.
    567   //
    568   // The address must be local.
    569 
    570   virtual Own<DatagramPort> bindDatagramPort();
    571   // Open this address as a datagram (e.g. UDP) port.
    572   //
    573   // The address must be local.
    574 
    575   virtual Own<NetworkAddress> clone() = 0;
    576   // Returns an equivalent copy of this NetworkAddress.
    577 
    578   virtual String toString() = 0;
    579   // Produce a human-readable string which hopefully can be passed to Network::parseAddress()
    580   // to reproduce this address, although whether or not that works of course depends on the Network
    581   // implementation.  This should be called only to display the address to human users, who will
    582   // hopefully know what they are able to do with it.
    583 };
    584 
    585 class Network {
    586   // Factory for NetworkAddress instances, representing the network services offered by the
    587   // operating system.
    588   //
    589   // This interface typically represents broad authority, and well-designed code should limit its
    590   // use to high-level startup code and user interaction.  Low-level APIs should accept
    591   // NetworkAddress instances directly and work from there, if at all possible.
    592 
    593 public:
    594   virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0;
    595   // Construct a network address from a user-provided string.  The format of the address
    596   // strings is not specified at the API level, and application code should make no assumptions
    597   // about them.  These strings should always be provided by humans, and said humans will know
    598   // what format to use in their particular context.
    599   //
    600   // `portHint`, if provided, specifies the "standard" IP port number for the application-level
    601   // service in play.  If the address turns out to be an IP address (v4 or v6), and it lacks a
    602   // port number, this port will be used.  If `addr` lacks a port number *and* `portHint` is
    603   // omitted, then the returned address will only support listen() and bindDatagramPort()
    604   // (not connect()), and an unused port will be chosen each time one of those methods is called.
    605 
    606   virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0;
    607   // Construct a network address from a legacy struct sockaddr.
    608 
    609   virtual Own<Network> restrictPeers(
    610       kj::ArrayPtr<const kj::StringPtr> allow,
    611       kj::ArrayPtr<const kj::StringPtr> deny = nullptr) KJ_WARN_UNUSED_RESULT = 0;
    612   // Constructs a new Network instance wrapping this one which restricts which peer addresses are
    613   // permitted (both for outgoing and incoming connections).
    614   //
    615   // Communication will be allowed only with peers whose addresses match one of the patterns
    616   // specified in the `allow` array. If a `deny` array is specified, then any address which matches
    617   // a pattern in `deny` and *does not* match any more-specific pattern in `allow` will also be
    618   // denied.
    619   //
    620   // The syntax of address patterns depends on the network, except that three special patterns are
    621   // defined for all networks:
    622   // - "private": Matches network addresses that are reserved by standards for private networks,
    623   //   such as "10.0.0.0/8" or "192.168.0.0/16". This is a superset of "local".
    624   // - "public": Opposite of "private".
    625   // - "local": Matches network addresses that are defined by standards to only be accessible from
    626   //   the local machine, such as "127.0.0.0/8" or Unix domain addresses.
    627   // - "network": Opposite of "local".
    628   //
    629   // For the standard KJ network implementation, the following patterns are also recognized:
    630   // - Network blocks specified in CIDR notation (ipv4 and ipv6), such as "192.0.2.0/24" or
    631   //   "2001:db8::/32".
    632   // - "unix" to match all Unix domain addresses. (In the future, we may support specifying a
    633   //   glob.)
    634   // - "unix-abstract" to match Linux's "abstract unix domain" addresses. (In the future, we may
    635   //   support specifying a glob.)
    636   //
    637   // Network restrictions apply *after* DNS resolution (otherwise they'd be useless).
    638   //
    639   // It is legal to parseAddress() a restricted address. An exception won't be thrown until
    640   // connect() is called.
    641   //
    642   // It's possible to listen() on a restricted address. However, connections will only be accepted
    643   // from non-restricted addresses; others will be dropped. If a particular listen address has no
    644   // valid peers (e.g. because it's a unix socket address and unix sockets are not allowed) then
    645   // listen() may throw (or may simply never receive any connections).
    646   //
    647   // Examples:
    648   //
    649   //     auto restricted = network->restrictPeers({"public"});
    650   //
    651   // Allows connections only to/from public internet addresses. Use this when connecting to an
    652   // address specified by a third party that is not trusted and is not themselves already on your
    653   // private network.
    654   //
    655   //     auto restricted = network->restrictPeers({"private"});
    656   //
    657   // Allows connections only to/from the private network. Use this on the server side to reject
    658   // connections from the public internet.
    659   //
    660   //     auto restricted = network->restrictPeers({"192.0.2.0/24"}, {"192.0.2.3/32"});
    661   //
    662   // Allows connections only to/from 192.0.2.*, except 192.0.2.3 which is blocked.
    663   //
    664   //     auto restricted = network->restrictPeers({"10.0.0.0/8", "10.1.2.3/32"}, {"10.1.2.0/24"});
    665   //
    666   // Allows connections to/from 10.*.*.*, with the exception of 10.1.2.* (which is denied), with an
    667   // exception to the exception of 10.1.2.3 (which is allowed, because it is matched by an allow
    668   // rule that is more specific than the deny rule).
    669 };
    670 
    671 // =======================================================================================
    672 // I/O Provider
    673 
    674 class AsyncIoProvider {
    675   // Class which constructs asynchronous wrappers around the operating system's I/O facilities.
    676   //
    677   // Generally, the implementation of this interface must integrate closely with a particular
    678   // `EventLoop` implementation.  Typically, the EventLoop implementation itself will provide
    679   // an AsyncIoProvider.
    680 
    681 public:
    682   virtual OneWayPipe newOneWayPipe() = 0;
    683   // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with
    684   // the pipe(2) system call).
    685 
    686   virtual TwoWayPipe newTwoWayPipe() = 0;
    687   // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with
    688   // socketpair(2) system call).  Data written to one end can be read from the other.
    689 
    690   virtual CapabilityPipe newCapabilityPipe();
    691   // Creates two AsyncCapabilityStreams representing the two ends of a two-way capability pipe.
    692   //
    693   // The default implementation throws an unimplemented exception. In particular this is not
    694   // implemented by the default AsyncIoProvider on Windows, since Windows lacks any sane way to
    695   // pass handles over a stream.
    696 
    697   virtual Network& getNetwork() = 0;
    698   // Creates a new `Network` instance representing the networks exposed by the operating system.
    699   //
    700   // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function.  If
    701   // you call this from low-level code, then you are preventing higher-level code from injecting an
    702   // alternative implementation.  Instead, if your code needs to use network functionality, it
    703   // should ask for a `Network` as a constructor or method parameter, so that higher-level code can
    704   // chose what implementation to use.  The system network is essentially a singleton.  See:
    705   //     http://www.object-oriented-security.org/lets-argue/singletons
    706   //
    707   // Code that uses the system network should not make any assumptions about what kinds of
    708   // addresses it will parse, as this could differ across platforms.  String addresses should come
    709   // strictly from the user, who will know how to write them correctly for their system.
    710   //
    711   // With that said, KJ currently supports the following string address formats:
    712   // - IPv4: "1.2.3.4", "1.2.3.4:80"
    713   // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80"
    714   // - Local IP wildcard (covers both v4 and v6):  "*", "*:80"
    715   // - Symbolic names:  "example.com", "example.com:80", "example.com:http", "1.2.3.4:http"
    716   // - Unix domain: "unix:/path/to/socket"
    717 
    718   struct PipeThread {
    719     // A combination of a thread and a two-way pipe that communicates with that thread.
    720     //
    721     // The fields are intentionally ordered so that the pipe will be destroyed (and therefore
    722     // disconnected) before the thread is destroyed (and therefore joined).  Thus if the thread
    723     // arranges to exit when it detects disconnect, destruction should be clean.
    724 
    725     Own<Thread> thread;
    726     Own<AsyncIoStream> pipe;
    727   };
    728 
    729   virtual PipeThread newPipeThread(
    730       Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0;
    731   // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate
    732   // with it.  One end of the pipe is passed to the thread's start function and the other end of
    733   // the pipe is returned.  The new thread also gets its own `AsyncIoProvider` instance and will
    734   // already have an active `EventLoop` when `startFunc` is called.
    735   //
    736   // TODO(someday):  I'm not entirely comfortable with this interface.  It seems to be doing too
    737   //   much at once but I'm not sure how to cleanly break it down.
    738 
    739   virtual Timer& getTimer() = 0;
    740   // Returns a `Timer` based on real time.  Time does not pass while event handlers are running --
    741   // it only updates when the event loop polls for system events.  This means that calling `now()`
    742   // on this timer does not require a system call.
    743   //
    744   // This timer is not affected by changes to the system date.  It is unspecified whether the timer
    745   // continues to count while the system is suspended.
    746 };
    747 
    748 class LowLevelAsyncIoProvider {
    749   // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on
    750   // different operating systems.  You should prefer to use `AsyncIoProvider` over this interface
    751   // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection.
    752   //
    753   // On Unix, this interface can be used to import native file descriptors into the async framework.
    754   // Different implementations of this interface might work on top of different event handling
    755   // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library.
    756   //
    757   // On Windows, this interface can be used to import native SOCKETs into the async framework.
    758   // Different implementations of this interface might work on top of different event handling
    759   // primitives, such as I/O completion ports vs. completion routines.
    760 
    761 public:
    762   enum Flags {
    763     // Flags controlling how to wrap a file descriptor.
    764 
    765     TAKE_OWNERSHIP = 1 << 0,
    766     // The returned object should own the file descriptor, automatically closing it when destroyed.
    767     // The close-on-exec flag will be set on the descriptor if it is not already.
    768     //
    769     // If this flag is not used, then the file descriptor is not automatically closed and the
    770     // close-on-exec flag is not modified.
    771 
    772 #if !_WIN32
    773     ALREADY_CLOEXEC = 1 << 1,
    774     // Indicates that the close-on-exec flag is known already to be set, so need not be set again.
    775     // Only relevant when combined with TAKE_OWNERSHIP.
    776     //
    777     // On Linux, all system calls which yield new file descriptors have flags or variants which
    778     // set the close-on-exec flag immediately.  Unfortunately, other OS's do not.
    779 
    780     ALREADY_NONBLOCK = 1 << 2
    781     // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag
    782     // need not be set again.  Otherwise, all wrap*Fd() methods will enable non-blocking mode
    783     // automatically.
    784     //
    785     // On Linux, all system calls which yield new file descriptors have flags or variants which
    786     // enable non-blocking mode immediately.  Unfortunately, other OS's do not.
    787 #endif
    788   };
    789 
    790 #if _WIN32
    791   typedef uintptr_t Fd;
    792   typedef AutoCloseHandle OwnFd;
    793   // On Windows, the `fd` parameter to each of these methods must be a SOCKET, and must have the
    794   // flag WSA_FLAG_OVERLAPPED (which socket() uses by default, but WSASocket() wants you to specify
    795   // explicitly).
    796 #else
    797   typedef int Fd;
    798   typedef AutoCloseFd OwnFd;
    799   // On Unix, any arbitrary file descriptor is supported.
    800 #endif
    801 
    802   virtual Own<AsyncInputStream> wrapInputFd(Fd fd, uint flags = 0) = 0;
    803   // Create an AsyncInputStream wrapping a file descriptor.
    804   //
    805   // `flags` is a bitwise-OR of the values of the `Flags` enum.
    806 
    807   virtual Own<AsyncOutputStream> wrapOutputFd(Fd fd, uint flags = 0) = 0;
    808   // Create an AsyncOutputStream wrapping a file descriptor.
    809   //
    810   // `flags` is a bitwise-OR of the values of the `Flags` enum.
    811 
    812   virtual Own<AsyncIoStream> wrapSocketFd(Fd fd, uint flags = 0) = 0;
    813   // Create an AsyncIoStream wrapping a socket file descriptor.
    814   //
    815   // `flags` is a bitwise-OR of the values of the `Flags` enum.
    816 
    817 #if !_WIN32
    818   virtual Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0);
    819   // Like wrapSocketFd() but also support capability passing via SCM_RIGHTS. The socket must be
    820   // a Unix domain socket.
    821   //
    822   // The default implementation throws UNIMPLEMENTED, for backwards-compatibility with
    823   // LowLevelAsyncIoProvider implementations written before this method was added.
    824 #endif
    825 
    826   virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
    827       Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0;
    828   // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address.
    829   // The returned promise does not resolve until connection has completed.
    830   //
    831   // `flags` is a bitwise-OR of the values of the `Flags` enum.
    832 
    833   class NetworkFilter {
    834   public:
    835     virtual bool shouldAllow(const struct sockaddr* addr, uint addrlen) = 0;
    836     // Returns true if incoming connections or datagrams from the given peer should be accepted.
    837     // If false, they will be dropped. This is used to implement kj::Network::restrictPeers().
    838 
    839     static NetworkFilter& getAllAllowed();
    840   };
    841 
    842   virtual Own<ConnectionReceiver> wrapListenSocketFd(
    843       Fd fd, NetworkFilter& filter, uint flags = 0) = 0;
    844   inline Own<ConnectionReceiver> wrapListenSocketFd(Fd fd, uint flags = 0) {
    845     return wrapListenSocketFd(fd, NetworkFilter::getAllAllowed(), flags);
    846   }
    847   // Create an AsyncIoStream wrapping a listen socket file descriptor.  This socket should already
    848   // have had `bind()` and `listen()` called on it, so it's ready for `accept()`.
    849   //
    850   // `flags` is a bitwise-OR of the values of the `Flags` enum.
    851 
    852   virtual Own<DatagramPort> wrapDatagramSocketFd(Fd fd, NetworkFilter& filter, uint flags = 0);
    853   inline Own<DatagramPort> wrapDatagramSocketFd(Fd fd, uint flags = 0) {
    854     return wrapDatagramSocketFd(fd, NetworkFilter::getAllAllowed(), flags);
    855   }
    856 
    857   virtual Timer& getTimer() = 0;
    858   // Returns a `Timer` based on real time.  Time does not pass while event handlers are running --
    859   // it only updates when the event loop polls for system events.  This means that calling `now()`
    860   // on this timer does not require a system call.
    861   //
    862   // This timer is not affected by changes to the system date.  It is unspecified whether the timer
    863   // continues to count while the system is suspended.
    864 
    865   Own<AsyncInputStream> wrapInputFd(OwnFd&& fd, uint flags = 0);
    866   Own<AsyncOutputStream> wrapOutputFd(OwnFd&& fd, uint flags = 0);
    867   Own<AsyncIoStream> wrapSocketFd(OwnFd&& fd, uint flags = 0);
    868 #if !_WIN32
    869   Own<AsyncCapabilityStream> wrapUnixSocketFd(OwnFd&& fd, uint flags = 0);
    870 #endif
    871   Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
    872       OwnFd&& fd, const struct sockaddr* addr, uint addrlen, uint flags = 0);
    873   Own<ConnectionReceiver> wrapListenSocketFd(
    874       OwnFd&& fd, NetworkFilter& filter, uint flags = 0);
    875   Own<ConnectionReceiver> wrapListenSocketFd(OwnFd&& fd, uint flags = 0);
    876   Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, NetworkFilter& filter, uint flags = 0);
    877   Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, uint flags = 0);
    878   // Convenience wrappers which transfer ownership via AutoCloseFd (Unix) or AutoCloseHandle
    879   // (Windows). TAKE_OWNERSHIP will be implicitly added to `flags`.
    880 };
    881 
    882 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel);
    883 // Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`.
    884 
    885 struct AsyncIoContext {
    886   Own<LowLevelAsyncIoProvider> lowLevelProvider;
    887   Own<AsyncIoProvider> provider;
    888   WaitScope& waitScope;
    889 
    890 #if _WIN32
    891   Win32EventPort& win32EventPort;
    892 #else
    893   UnixEventPort& unixEventPort;
    894   // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This
    895   //   field will go away at some point when we have a chance to improve these interfaces.
    896 #endif
    897 };
    898 
    899 AsyncIoContext setupAsyncIo();
    900 // Convenience method which sets up the current thread with everything it needs to do async I/O.
    901 // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for
    902 // doing I/O on the host system, so everything is ready for the thread to start making async calls
    903 // and waiting on promises.
    904 //
    905 // You would typically call this in your main() loop or in the start function of a thread.
    906 // Example:
    907 //
    908 //     int main() {
    909 //       auto ioContext = kj::setupAsyncIo();
    910 //
    911 //       // Now we can call an async function.
    912 //       Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com");
    913 //
    914 //       // And we can wait for the promise to complete.  Note that you can only use `wait()`
    915 //       // from the top level, not from inside a promise callback.
    916 //       String text = textPromise.wait(ioContext.waitScope);
    917 //       print(text);
    918 //       return 0;
    919 //     }
    920 //
    921 // WARNING: An AsyncIoContext can only be used in the thread and process that created it. In
    922 //   particular, note that after a fork(), an AsyncIoContext created in the parent process will
    923 //   not work correctly in the child, even if the parent ceases to use its copy. In particular
    924 //   note that this means that server processes which daemonize themselves at startup must wait
    925 //   until after daemonization to create an AsyncIoContext.
    926 
    927 // =======================================================================================
    928 // Convenience adapters.
    929 
    930 class CapabilityStreamConnectionReceiver final: public ConnectionReceiver {
    931   // Trivial wrapper which allows an AsyncCapabilityStream to act as a ConnectionReceiver. accept()
    932   // calls receiveStream().
    933 
    934 public:
    935   CapabilityStreamConnectionReceiver(AsyncCapabilityStream& inner)
    936       : inner(inner) {}
    937 
    938   Promise<Own<AsyncIoStream>> accept() override;
    939   uint getPort() override;
    940 
    941   Promise<AuthenticatedStream> acceptAuthenticated() override;
    942   // Always produces UnknownIdentity. Capability-based security patterns should not rely on
    943   // authenticating peers; the other end of the capability stream should only be given to
    944   // authorized parties in the first place.
    945 
    946 private:
    947   AsyncCapabilityStream& inner;
    948 };
    949 
    950 class CapabilityStreamNetworkAddress final: public NetworkAddress {
    951   // Trivial wrapper which allows an AsyncCapabilityStream to act as a NetworkAddress.
    952   //
    953   // connect() is implemented by calling provider.newCapabilityPipe(), sending one end over the
    954   // original capability stream, and returning the other end. If `provider` is null, then the
    955   // global kj::newCapabilityPipe() will be used, but this ONLY works if `inner` itself is agnostic
    956   // to the type of streams it receives, e.g. because it was also created using
    957   // kj::NewCapabilityPipe().
    958   //
    959   // listen().accept() is implemented by receiving new streams over the original stream.
    960   //
    961   // Note that clone() doesn't work (due to ownership issues) and toString() returns a static
    962   // string.
    963 
    964 public:
    965   CapabilityStreamNetworkAddress(kj::Maybe<AsyncIoProvider&> provider, AsyncCapabilityStream& inner)
    966       : provider(provider), inner(inner) {}
    967 
    968   Promise<Own<AsyncIoStream>> connect() override;
    969   Own<ConnectionReceiver> listen() override;
    970 
    971   Own<NetworkAddress> clone() override;
    972   String toString() override;
    973 
    974   Promise<AuthenticatedStream> connectAuthenticated() override;
    975   // Always produces UnknownIdentity. Capability-based security patterns should not rely on
    976   // authenticating peers; the other end of the capability stream should only be given to
    977   // authorized parties in the first place.
    978 
    979 private:
    980   kj::Maybe<AsyncIoProvider&> provider;
    981   AsyncCapabilityStream& inner;
    982 };
    983 
    984 // =======================================================================================
    985 // inline implementation details
    986 
    987 inline AncillaryMessage::AncillaryMessage(
    988     int level, int type, ArrayPtr<const byte> data)
    989     : level(level), type(type), data(data) {}
    990 
    991 inline int AncillaryMessage::getLevel() const { return level; }
    992 inline int AncillaryMessage::getType() const { return type; }
    993 
    994 template <typename T>
    995 inline Maybe<const T&> AncillaryMessage::as() const {
    996   if (data.size() >= sizeof(T)) {
    997     return *reinterpret_cast<const T*>(data.begin());
    998   } else {
    999     return nullptr;
   1000   }
   1001 }
   1002 
   1003 template <typename T>
   1004 inline ArrayPtr<const T> AncillaryMessage::asArray() const {
   1005   return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T));
   1006 }
   1007 
   1008 }  // namespace kj
   1009 
   1010 KJ_END_HEADER