readiness-io.c++ (5022B)
1 // Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #include "readiness-io.h" 23 24 namespace kj { 25 26 static size_t copyInto(kj::ArrayPtr<byte> dst, kj::ArrayPtr<const byte>& src) { 27 size_t n = kj::min(dst.size(), src.size()); 28 memcpy(dst.begin(), src.begin(), n); 29 src = src.slice(n, src.size()); 30 return n; 31 } 32 33 // ======================================================================================= 34 35 ReadyInputStreamWrapper::ReadyInputStreamWrapper(AsyncInputStream& input): input(input) {} 36 ReadyInputStreamWrapper::~ReadyInputStreamWrapper() noexcept(false) {} 37 38 kj::Maybe<size_t> ReadyInputStreamWrapper::read(kj::ArrayPtr<byte> dst) { 39 if (eof || dst.size() == 0) return size_t(0); 40 41 if (content.size() == 0) { 42 // No data available. Try to read more. 43 if (!isPumping) { 44 isPumping = true; 45 pumpTask = kj::evalNow([&]() { 46 return input.tryRead(buffer, 1, sizeof(buffer)).then([this](size_t n) { 47 if (n == 0) { 48 eof = true; 49 } else { 50 content = kj::arrayPtr(buffer, n); 51 } 52 isPumping = false; 53 }); 54 }).fork(); 55 } 56 57 return nullptr; 58 } 59 60 return copyInto(dst, content); 61 } 62 63 kj::Promise<void> ReadyInputStreamWrapper::whenReady() { 64 return pumpTask.addBranch(); 65 } 66 67 // ======================================================================================= 68 69 ReadyOutputStreamWrapper::ReadyOutputStreamWrapper(AsyncOutputStream& output): output(output) {} 70 ReadyOutputStreamWrapper::~ReadyOutputStreamWrapper() noexcept(false) {} 71 72 kj::Maybe<size_t> ReadyOutputStreamWrapper::write(kj::ArrayPtr<const byte> data) { 73 if (data.size() == 0) return size_t(0); 74 75 if (filled == sizeof(buffer)) { 76 // No space. 77 return nullptr; 78 } 79 80 uint end = start + filled; 81 size_t result = 0; 82 if (end < sizeof(buffer)) { 83 // The filled part of the buffer is somewhere in the middle. 84 85 // Copy into space after filled space. 86 result += copyInto(kj::arrayPtr(buffer + end, buffer + sizeof(buffer)), data); 87 88 // Copy into space before filled space. 89 result += copyInto(kj::arrayPtr(buffer, buffer + start), data); 90 } else { 91 // Fill currently loops, so we only have one segment of empty space to copy into. 92 93 // Copy into the space between the fill's end and the fill's start. 94 result += copyInto(kj::arrayPtr(buffer + end % sizeof(buffer), buffer + start), data); 95 } 96 97 filled += result; 98 99 if (!isPumping && (!corked || filled == sizeof(buffer))) { 100 isPumping = true; 101 pumpTask = kj::evalNow([&]() { 102 return pump(); 103 }).fork(); 104 } 105 106 return result; 107 } 108 109 kj::Promise<void> ReadyOutputStreamWrapper::whenReady() { 110 return pumpTask.addBranch(); 111 } 112 113 ReadyOutputStreamWrapper::Cork ReadyOutputStreamWrapper::cork() { 114 corked = true; 115 return Cork(*this); 116 } 117 118 void ReadyOutputStreamWrapper::uncork() { 119 corked = false; 120 if (!isPumping && filled > 0) { 121 isPumping = true; 122 pumpTask = kj::evalNow([&]() { 123 return pump(); 124 }).fork(); 125 } 126 } 127 128 kj::Promise<void> ReadyOutputStreamWrapper::pump() { 129 uint oldFilled = filled; 130 uint end = start + filled; 131 132 kj::Promise<void> promise = nullptr; 133 if (end <= sizeof(buffer)) { 134 promise = output.write(buffer + start, filled); 135 } else { 136 end = end % sizeof(buffer); 137 segments[0] = kj::arrayPtr(buffer + start, buffer + sizeof(buffer)); 138 segments[1] = kj::arrayPtr(buffer, buffer + end); 139 promise = output.write(segments); 140 } 141 142 return promise.then([this,oldFilled,end]() -> kj::Promise<void> { 143 filled -= oldFilled; 144 start = end; 145 146 if (filled > 0) { 147 return pump(); 148 } else { 149 isPumping = false; 150 // As a small optimization, reset to the start of the buffer when it's empty so we can provide 151 // the underlying layer just one contiguous chunk of memory instead of two when possible. 152 start = 0; 153 return kj::READY_NOW; 154 } 155 }); 156 } 157 158 } // namespace kj 159