readiness-io.h (4577B)
1 // Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #pragma once 23 24 #include <kj/async-io.h> 25 26 namespace kj { 27 28 class ReadyInputStreamWrapper { 29 // Provides readiness-based Async I/O as a wrapper around KJ's standard completion-based API, for 30 // compatibility with libraries that use readiness-based abstractions (e.g. OpenSSL). 31 // 32 // Unfortunately this requires buffering, so is not very efficient. 33 34 public: 35 ReadyInputStreamWrapper(AsyncInputStream& input); 36 ~ReadyInputStreamWrapper() noexcept(false); 37 KJ_DISALLOW_COPY(ReadyInputStreamWrapper); 38 39 kj::Maybe<size_t> read(kj::ArrayPtr<byte> dst); 40 // Reads bytes into `dst`, returning the number of bytes read. Returns zero only at EOF. Returns 41 // nullptr if not ready. 42 43 kj::Promise<void> whenReady(); 44 // Returns a promise that resolves when read() will return non-null. 45 46 private: 47 AsyncInputStream& input; 48 kj::ForkedPromise<void> pumpTask = nullptr; 49 bool isPumping = false; 50 bool eof = false; 51 52 kj::ArrayPtr<const byte> content = nullptr; // Points to currently-valid part of `buffer`. 53 byte buffer[8192]; 54 }; 55 56 class ReadyOutputStreamWrapper { 57 // Provides readiness-based Async I/O as a wrapper around KJ's standard completion-based API, for 58 // compatibility with libraries that use readiness-based abstractions (e.g. OpenSSL). 59 // 60 // Unfortunately this requires buffering, so is not very efficient. 61 62 public: 63 ReadyOutputStreamWrapper(AsyncOutputStream& output); 64 ~ReadyOutputStreamWrapper() noexcept(false); 65 KJ_DISALLOW_COPY(ReadyOutputStreamWrapper); 66 67 kj::Maybe<size_t> write(kj::ArrayPtr<const byte> src); 68 // Writes bytes from `src`, returning the number of bytes written. Never returns zero for 69 // a non-empty `src`. Returns nullptr if not ready. 70 71 kj::Promise<void> whenReady(); 72 // Returns a promise that resolves when write() will return non-null. 73 74 class Cork; 75 // An object that, when destructed, will uncork its parent stream. 76 77 Cork cork(); 78 // After calling, data won't be pumped until either the internal buffer fills up or the returned 79 // object is destructed. Use this if you know multiple small write() calls will be happening in 80 // the near future and want to flush them all at once. 81 // Once the returned object is destructed, behavior goes back to normal. The returned object 82 // must be destructed before the ReadyOutputStreamWrapper. 83 // TODO(perf): This is an ugly hack to avoid sending lots of tiny packets when using TLS, which 84 // has to work around OpenSSL's readiness-based I/O layer. We could certainly do better here. 85 86 private: 87 AsyncOutputStream& output; 88 ArrayPtr<const byte> segments[2]; 89 kj::ForkedPromise<void> pumpTask = nullptr; 90 bool isPumping = false; 91 bool corked = false; 92 93 uint start = 0; // index of first byte 94 uint filled = 0; // number of bytes currently in buffer 95 96 byte buffer[8192]; 97 98 void uncork(); 99 100 kj::Promise<void> pump(); 101 // Asynchronously push the buffer out to the underlying stream. 102 }; 103 104 class ReadyOutputStreamWrapper::Cork { 105 // An object that, when destructed, will uncork its parent stream. 106 public: 107 ~Cork() { 108 KJ_IF_MAYBE(p, parent) { 109 p->uncork(); 110 } 111 } 112 Cork(Cork&& other) : parent(kj::mv(other.parent)) { 113 other.parent = nullptr; 114 } 115 KJ_DISALLOW_COPY(Cork); 116 117 private: 118 Cork(ReadyOutputStreamWrapper& parent) : parent(parent) {} 119 120 kj::Maybe<ReadyOutputStreamWrapper&> parent; 121 friend class ReadyOutputStreamWrapper; 122 }; 123 124 } // namespace kj