serialize-async.h (9526B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #pragma once 23 24 #include <kj/async-io.h> 25 #include "message.h" 26 27 CAPNP_BEGIN_HEADER 28 29 namespace capnp { 30 31 struct MessageReaderAndFds { 32 kj::Own<MessageReader> reader; 33 kj::ArrayPtr<kj::AutoCloseFd> fds; 34 }; 35 36 class MessageStream { 37 // Interface over which messages can be sent and received; virtualizes 38 // the functionality above. 39 public: 40 virtual kj::Promise<kj::Maybe<MessageReaderAndFds>> tryReadMessage( 41 kj::ArrayPtr<kj::AutoCloseFd> fdSpace, 42 ReaderOptions options = ReaderOptions(), kj::ArrayPtr<word> scratchSpace = nullptr) = 0; 43 // Read a message that may also have file descriptors attached, e.g. from a Unix socket with 44 // SCM_RIGHTS. Returns null on EOF. 45 // 46 // `scratchSpace`, if provided, must remain valid until the returned MessageReader is destroyed. 47 48 kj::Promise<kj::Maybe<kj::Own<MessageReader>>> tryReadMessage( 49 ReaderOptions options = ReaderOptions(), 50 kj::ArrayPtr<word> scratchSpace = nullptr); 51 // Equivalent to the above with fdSpace = nullptr. 52 53 kj::Promise<MessageReaderAndFds> readMessage( 54 kj::ArrayPtr<kj::AutoCloseFd> fdSpace, 55 ReaderOptions options = ReaderOptions(), kj::ArrayPtr<word> scratchSpace = nullptr); 56 kj::Promise<kj::Own<MessageReader>> readMessage( 57 ReaderOptions options = ReaderOptions(), 58 kj::ArrayPtr<word> scratchSpace = nullptr); 59 // Like tryReadMessage, but throws an exception on EOF. 60 61 virtual kj::Promise<void> writeMessage( 62 kj::ArrayPtr<const int> fds, 63 kj::ArrayPtr<const kj::ArrayPtr<const word>> segments) 64 KJ_WARN_UNUSED_RESULT = 0; 65 kj::Promise<void> writeMessage( 66 kj::ArrayPtr<const int> fds, 67 MessageBuilder& builder) 68 KJ_WARN_UNUSED_RESULT; 69 // Write a message with FDs attached, e.g. to a Unix socket with SCM_RIGHTS. 70 // The parameters must remain valid until the returned promise resolves. 71 72 kj::Promise<void> writeMessage( 73 kj::ArrayPtr<const kj::ArrayPtr<const word>> segments) 74 KJ_WARN_UNUSED_RESULT; 75 kj::Promise<void> writeMessage(MessageBuilder& builder) 76 KJ_WARN_UNUSED_RESULT; 77 // Equivalent to the above with fds = nullptr. 78 79 virtual kj::Promise<void> writeMessages( 80 kj::ArrayPtr<kj::ArrayPtr<const kj::ArrayPtr<const word>>> messages) 81 KJ_WARN_UNUSED_RESULT = 0; 82 kj::Promise<void> writeMessages(kj::ArrayPtr<MessageBuilder*> builders) 83 KJ_WARN_UNUSED_RESULT; 84 // Similar to the above, but for writing multiple messages at a time in a batch. 85 86 virtual kj::Maybe<int> getSendBufferSize() = 0; 87 // Get the size of the underlying send buffer, if applicable. The RPC 88 // system uses this as a hint for flow control purposes; see: 89 // 90 // https://capnproto.org/news/2020-04-23-capnproto-0.8.html#multi-stream-flow-control 91 // 92 // ...for a more thorough explanation of how this is used. Implementations 93 // may return nullptr if they do not have access to this information, or if 94 // the underlying transport does not use a congestion window. 95 96 virtual kj::Promise<void> end() = 0; 97 // Cleanly shut down just the write end of the transport, while keeping the read end open. 98 99 }; 100 101 class AsyncIoMessageStream final: public MessageStream { 102 // A MessageStream that wraps an AsyncIoStream. 103 public: 104 explicit AsyncIoMessageStream(kj::AsyncIoStream& stream); 105 106 // Implements MessageStream 107 kj::Promise<kj::Maybe<MessageReaderAndFds>> tryReadMessage( 108 kj::ArrayPtr<kj::AutoCloseFd> fdSpace, 109 ReaderOptions options = ReaderOptions(), kj::ArrayPtr<word> scratchSpace = nullptr) override; 110 kj::Promise<void> writeMessage( 111 kj::ArrayPtr<const int> fds, 112 kj::ArrayPtr<const kj::ArrayPtr<const word>> segments) override; 113 kj::Promise<void> writeMessages( 114 kj::ArrayPtr<kj::ArrayPtr<const kj::ArrayPtr<const word>>> messages) override; 115 kj::Maybe<int> getSendBufferSize() override; 116 117 kj::Promise<void> end() override; 118 private: 119 kj::AsyncIoStream& stream; 120 }; 121 122 class AsyncCapabilityMessageStream final: public MessageStream { 123 // A MessageStream that wraps an AsyncCapabilityStream. 124 public: 125 explicit AsyncCapabilityMessageStream(kj::AsyncCapabilityStream& stream); 126 127 // Implements MessageStream 128 kj::Promise<kj::Maybe<MessageReaderAndFds>> tryReadMessage( 129 kj::ArrayPtr<kj::AutoCloseFd> fdSpace, 130 ReaderOptions options = ReaderOptions(), kj::ArrayPtr<word> scratchSpace = nullptr) override; 131 kj::Promise<void> writeMessage( 132 kj::ArrayPtr<const int> fds, 133 kj::ArrayPtr<const kj::ArrayPtr<const word>> segments) override; 134 kj::Promise<void> writeMessages( 135 kj::ArrayPtr<kj::ArrayPtr<const kj::ArrayPtr<const word>>> messages) override; 136 kj::Maybe<int> getSendBufferSize() override; 137 kj::Promise<void> end() override; 138 private: 139 kj::AsyncCapabilityStream& stream; 140 }; 141 142 // ----------------------------------------------------------------------------- 143 // Stand-alone functions for reading & writing messages on AsyncInput/AsyncOutputStreams. 144 // 145 // In general, foo(stream, ...) is equivalent to 146 // AsyncIoMessageStream(stream).foo(...), whenever the latter would type check. 147 // 148 // The first argument must remain valid until the returned promise resolves 149 // (or is canceled). 150 151 kj::Promise<kj::Own<MessageReader>> readMessage( 152 kj::AsyncInputStream& input, ReaderOptions options = ReaderOptions(), 153 kj::ArrayPtr<word> scratchSpace = nullptr); 154 155 kj::Promise<kj::Maybe<kj::Own<MessageReader>>> tryReadMessage( 156 kj::AsyncInputStream& input, ReaderOptions options = ReaderOptions(), 157 kj::ArrayPtr<word> scratchSpace = nullptr); 158 159 kj::Promise<void> writeMessage(kj::AsyncOutputStream& output, 160 kj::ArrayPtr<const kj::ArrayPtr<const word>> segments) 161 KJ_WARN_UNUSED_RESULT; 162 163 kj::Promise<void> writeMessage(kj::AsyncOutputStream& output, MessageBuilder& builder) 164 KJ_WARN_UNUSED_RESULT; 165 166 // ----------------------------------------------------------------------------- 167 // Stand-alone versions that support FD passing. 168 // 169 // For each of these, `foo(stream, ...)` is equivalent to 170 // `AsyncCapabilityMessageStream(stream).foo(...)`. 171 172 kj::Promise<MessageReaderAndFds> readMessage( 173 kj::AsyncCapabilityStream& input, kj::ArrayPtr<kj::AutoCloseFd> fdSpace, 174 ReaderOptions options = ReaderOptions(), kj::ArrayPtr<word> scratchSpace = nullptr); 175 176 kj::Promise<kj::Maybe<MessageReaderAndFds>> tryReadMessage( 177 kj::AsyncCapabilityStream& input, kj::ArrayPtr<kj::AutoCloseFd> fdSpace, 178 ReaderOptions options = ReaderOptions(), kj::ArrayPtr<word> scratchSpace = nullptr); 179 180 kj::Promise<void> writeMessage(kj::AsyncCapabilityStream& output, kj::ArrayPtr<const int> fds, 181 kj::ArrayPtr<const kj::ArrayPtr<const word>> segments) 182 KJ_WARN_UNUSED_RESULT; 183 kj::Promise<void> writeMessage(kj::AsyncCapabilityStream& output, kj::ArrayPtr<const int> fds, 184 MessageBuilder& builder) 185 KJ_WARN_UNUSED_RESULT; 186 187 188 // ----------------------------------------------------------------------------- 189 // Stand-alone functions for writing multiple messages at once on AsyncOutputStreams. 190 191 kj::Promise<void> writeMessages(kj::AsyncOutputStream& output, 192 kj::ArrayPtr<kj::ArrayPtr<const kj::ArrayPtr<const word>>> messages) 193 KJ_WARN_UNUSED_RESULT; 194 195 kj::Promise<void> writeMessages( 196 kj::AsyncOutputStream& output, kj::ArrayPtr<MessageBuilder*> builders) 197 KJ_WARN_UNUSED_RESULT; 198 199 // ======================================================================================= 200 // inline implementation details 201 202 inline kj::Promise<void> writeMessage(kj::AsyncOutputStream& output, MessageBuilder& builder) { 203 return writeMessage(output, builder.getSegmentsForOutput()); 204 } 205 inline kj::Promise<void> writeMessage( 206 kj::AsyncCapabilityStream& output, kj::ArrayPtr<const int> fds, MessageBuilder& builder) { 207 return writeMessage(output, fds, builder.getSegmentsForOutput()); 208 } 209 210 inline kj::Promise<void> MessageStream::writeMessage(kj::ArrayPtr<const kj::ArrayPtr<const word>> segments) { 211 return writeMessage(nullptr, segments); 212 } 213 214 inline kj::Promise<void> MessageStream::writeMessage(MessageBuilder& builder) { 215 return writeMessage(builder.getSegmentsForOutput()); 216 } 217 218 inline kj::Promise<void> MessageStream::writeMessage( 219 kj::ArrayPtr<const int> fds, MessageBuilder& builder) { 220 return writeMessage(fds, builder.getSegmentsForOutput()); 221 } 222 223 } // namespace capnp 224 225 CAPNP_END_HEADER