capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

serialize-async.h (9526B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #pragma once
     23 
     24 #include <kj/async-io.h>
     25 #include "message.h"
     26 
     27 CAPNP_BEGIN_HEADER
     28 
     29 namespace capnp {
     30 
     31 struct MessageReaderAndFds {
     32   kj::Own<MessageReader> reader;
     33   kj::ArrayPtr<kj::AutoCloseFd> fds;
     34 };
     35 
     36 class MessageStream {
     37   // Interface over which messages can be sent and received; virtualizes
     38   // the functionality above.
     39 public:
     40   virtual kj::Promise<kj::Maybe<MessageReaderAndFds>> tryReadMessage(
     41       kj::ArrayPtr<kj::AutoCloseFd> fdSpace,
     42       ReaderOptions options = ReaderOptions(), kj::ArrayPtr<word> scratchSpace = nullptr) = 0;
     43   // Read a message that may also have file descriptors attached, e.g. from a Unix socket with
     44   // SCM_RIGHTS. Returns null on EOF.
     45   //
     46   // `scratchSpace`, if provided, must remain valid until the returned MessageReader is destroyed.
     47 
     48   kj::Promise<kj::Maybe<kj::Own<MessageReader>>> tryReadMessage(
     49       ReaderOptions options = ReaderOptions(),
     50       kj::ArrayPtr<word> scratchSpace = nullptr);
     51   // Equivalent to the above with fdSpace = nullptr.
     52 
     53   kj::Promise<MessageReaderAndFds> readMessage(
     54       kj::ArrayPtr<kj::AutoCloseFd> fdSpace,
     55       ReaderOptions options = ReaderOptions(), kj::ArrayPtr<word> scratchSpace = nullptr);
     56   kj::Promise<kj::Own<MessageReader>> readMessage(
     57       ReaderOptions options = ReaderOptions(),
     58       kj::ArrayPtr<word> scratchSpace = nullptr);
     59   // Like tryReadMessage, but throws an exception on EOF.
     60 
     61   virtual kj::Promise<void> writeMessage(
     62       kj::ArrayPtr<const int> fds,
     63       kj::ArrayPtr<const kj::ArrayPtr<const word>> segments)
     64     KJ_WARN_UNUSED_RESULT = 0;
     65   kj::Promise<void> writeMessage(
     66       kj::ArrayPtr<const int> fds,
     67       MessageBuilder& builder)
     68     KJ_WARN_UNUSED_RESULT;
     69   // Write a message with FDs attached, e.g. to a Unix socket with SCM_RIGHTS.
     70   // The parameters must remain valid until the returned promise resolves.
     71 
     72   kj::Promise<void> writeMessage(
     73       kj::ArrayPtr<const kj::ArrayPtr<const word>> segments)
     74     KJ_WARN_UNUSED_RESULT;
     75   kj::Promise<void> writeMessage(MessageBuilder& builder)
     76       KJ_WARN_UNUSED_RESULT;
     77   // Equivalent to the above with fds = nullptr.
     78 
     79   virtual kj::Promise<void> writeMessages(
     80       kj::ArrayPtr<kj::ArrayPtr<const kj::ArrayPtr<const word>>> messages)
     81     KJ_WARN_UNUSED_RESULT = 0;
     82   kj::Promise<void> writeMessages(kj::ArrayPtr<MessageBuilder*> builders)
     83       KJ_WARN_UNUSED_RESULT;
     84   // Similar to the above, but for writing multiple messages at a time in a batch.
     85 
     86   virtual kj::Maybe<int> getSendBufferSize() = 0;
     87   // Get the size of the underlying send buffer, if applicable. The RPC
     88   // system uses this as a hint for flow control purposes; see:
     89   //
     90   // https://capnproto.org/news/2020-04-23-capnproto-0.8.html#multi-stream-flow-control
     91   //
     92   // ...for a more thorough explanation of how this is used. Implementations
     93   // may return nullptr if they do not have access to this information, or if
     94   // the underlying transport does not use a congestion window.
     95 
     96   virtual kj::Promise<void> end() = 0;
     97   // Cleanly shut down just the write end of the transport, while keeping the read end open.
     98 
     99 };
    100 
    101 class AsyncIoMessageStream final: public MessageStream {
    102   // A MessageStream that wraps an AsyncIoStream.
    103 public:
    104   explicit AsyncIoMessageStream(kj::AsyncIoStream& stream);
    105 
    106   // Implements MessageStream
    107   kj::Promise<kj::Maybe<MessageReaderAndFds>> tryReadMessage(
    108       kj::ArrayPtr<kj::AutoCloseFd> fdSpace,
    109       ReaderOptions options = ReaderOptions(), kj::ArrayPtr<word> scratchSpace = nullptr) override;
    110   kj::Promise<void> writeMessage(
    111       kj::ArrayPtr<const int> fds,
    112       kj::ArrayPtr<const kj::ArrayPtr<const word>> segments) override;
    113   kj::Promise<void> writeMessages(
    114       kj::ArrayPtr<kj::ArrayPtr<const kj::ArrayPtr<const word>>> messages) override;
    115   kj::Maybe<int> getSendBufferSize() override;
    116 
    117   kj::Promise<void> end() override;
    118 private:
    119   kj::AsyncIoStream& stream;
    120 };
    121 
    122 class AsyncCapabilityMessageStream final: public MessageStream {
    123   // A MessageStream that wraps an AsyncCapabilityStream.
    124 public:
    125   explicit AsyncCapabilityMessageStream(kj::AsyncCapabilityStream& stream);
    126 
    127   // Implements MessageStream
    128   kj::Promise<kj::Maybe<MessageReaderAndFds>> tryReadMessage(
    129       kj::ArrayPtr<kj::AutoCloseFd> fdSpace,
    130       ReaderOptions options = ReaderOptions(), kj::ArrayPtr<word> scratchSpace = nullptr) override;
    131   kj::Promise<void> writeMessage(
    132       kj::ArrayPtr<const int> fds,
    133       kj::ArrayPtr<const kj::ArrayPtr<const word>> segments) override;
    134   kj::Promise<void> writeMessages(
    135       kj::ArrayPtr<kj::ArrayPtr<const kj::ArrayPtr<const word>>> messages) override;
    136   kj::Maybe<int> getSendBufferSize() override;
    137   kj::Promise<void> end() override;
    138 private:
    139   kj::AsyncCapabilityStream& stream;
    140 };
    141 
    142 // -----------------------------------------------------------------------------
    143 // Stand-alone functions for reading & writing messages on AsyncInput/AsyncOutputStreams.
    144 //
    145 // In general, foo(stream, ...) is equivalent to
    146 // AsyncIoMessageStream(stream).foo(...), whenever the latter would type check.
    147 //
    148 // The first argument must remain valid until the returned promise resolves
    149 // (or is canceled).
    150 
    151 kj::Promise<kj::Own<MessageReader>> readMessage(
    152     kj::AsyncInputStream& input, ReaderOptions options = ReaderOptions(),
    153     kj::ArrayPtr<word> scratchSpace = nullptr);
    154 
    155 kj::Promise<kj::Maybe<kj::Own<MessageReader>>> tryReadMessage(
    156     kj::AsyncInputStream& input, ReaderOptions options = ReaderOptions(),
    157     kj::ArrayPtr<word> scratchSpace = nullptr);
    158 
    159 kj::Promise<void> writeMessage(kj::AsyncOutputStream& output,
    160                                kj::ArrayPtr<const kj::ArrayPtr<const word>> segments)
    161     KJ_WARN_UNUSED_RESULT;
    162 
    163 kj::Promise<void> writeMessage(kj::AsyncOutputStream& output, MessageBuilder& builder)
    164     KJ_WARN_UNUSED_RESULT;
    165 
    166 // -----------------------------------------------------------------------------
    167 // Stand-alone versions that support FD passing.
    168 //
    169 // For each of these, `foo(stream, ...)` is equivalent to
    170 // `AsyncCapabilityMessageStream(stream).foo(...)`.
    171 
    172 kj::Promise<MessageReaderAndFds> readMessage(
    173     kj::AsyncCapabilityStream& input, kj::ArrayPtr<kj::AutoCloseFd> fdSpace,
    174     ReaderOptions options = ReaderOptions(), kj::ArrayPtr<word> scratchSpace = nullptr);
    175 
    176 kj::Promise<kj::Maybe<MessageReaderAndFds>> tryReadMessage(
    177     kj::AsyncCapabilityStream& input, kj::ArrayPtr<kj::AutoCloseFd> fdSpace,
    178     ReaderOptions options = ReaderOptions(), kj::ArrayPtr<word> scratchSpace = nullptr);
    179 
    180 kj::Promise<void> writeMessage(kj::AsyncCapabilityStream& output, kj::ArrayPtr<const int> fds,
    181                                kj::ArrayPtr<const kj::ArrayPtr<const word>> segments)
    182     KJ_WARN_UNUSED_RESULT;
    183 kj::Promise<void> writeMessage(kj::AsyncCapabilityStream& output, kj::ArrayPtr<const int> fds,
    184                                MessageBuilder& builder)
    185     KJ_WARN_UNUSED_RESULT;
    186 
    187 
    188 // -----------------------------------------------------------------------------
    189 // Stand-alone functions for writing multiple messages at once on AsyncOutputStreams.
    190 
    191 kj::Promise<void> writeMessages(kj::AsyncOutputStream& output,
    192                                 kj::ArrayPtr<kj::ArrayPtr<const kj::ArrayPtr<const word>>> messages)
    193     KJ_WARN_UNUSED_RESULT;
    194 
    195 kj::Promise<void> writeMessages(
    196     kj::AsyncOutputStream& output, kj::ArrayPtr<MessageBuilder*> builders)
    197     KJ_WARN_UNUSED_RESULT;
    198 
    199 // =======================================================================================
    200 // inline implementation details
    201 
    202 inline kj::Promise<void> writeMessage(kj::AsyncOutputStream& output, MessageBuilder& builder) {
    203   return writeMessage(output, builder.getSegmentsForOutput());
    204 }
    205 inline kj::Promise<void> writeMessage(
    206     kj::AsyncCapabilityStream& output, kj::ArrayPtr<const int> fds, MessageBuilder& builder) {
    207   return writeMessage(output, fds, builder.getSegmentsForOutput());
    208 }
    209 
    210 inline kj::Promise<void> MessageStream::writeMessage(kj::ArrayPtr<const kj::ArrayPtr<const word>> segments) {
    211   return writeMessage(nullptr, segments);
    212 }
    213 
    214 inline kj::Promise<void> MessageStream::writeMessage(MessageBuilder& builder) {
    215   return writeMessage(builder.getSegmentsForOutput());
    216 }
    217 
    218 inline kj::Promise<void> MessageStream::writeMessage(
    219     kj::ArrayPtr<const int> fds, MessageBuilder& builder) {
    220   return writeMessage(fds, builder.getSegmentsForOutput());
    221 }
    222 
    223 }  // namespace capnp
    224 
    225 CAPNP_END_HEADER