readiness-io-test.c++ (8861B)
1 // Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #include "readiness-io.h" 23 #include <kj/test.h> 24 #include <stdlib.h> 25 26 namespace kj { 27 namespace { 28 29 KJ_TEST("readiness IO: write small") { 30 auto io = setupAsyncIo(); 31 auto pipe = io.provider->newOneWayPipe(); 32 33 char buf[4]; 34 auto readPromise = pipe.in->read(buf, 3, 4); 35 36 ReadyOutputStreamWrapper out(*pipe.out); 37 KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("foo").asBytes())) == 3); 38 39 KJ_ASSERT(readPromise.wait(io.waitScope) == 3); 40 buf[3] = '\0'; 41 KJ_ASSERT(kj::StringPtr(buf) == "foo"); 42 } 43 44 KJ_TEST("readiness IO: write many odd") { 45 auto io = setupAsyncIo(); 46 auto pipe = io.provider->newOneWayPipe(); 47 48 ReadyOutputStreamWrapper out(*pipe.out); 49 50 size_t totalWritten = 0; 51 for (;;) { 52 KJ_IF_MAYBE(n, out.write(kj::StringPtr("bar").asBytes())) { 53 totalWritten += *n; 54 if (*n < 3) { 55 break; 56 } 57 } else { 58 KJ_FAIL_ASSERT("pipe buffer is divisible by 3? really?"); 59 } 60 } 61 62 auto buf = kj::heapArray<char>(totalWritten + 1); 63 size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope); 64 KJ_ASSERT(n == totalWritten); 65 for (size_t i = 0; i < totalWritten; i++) { 66 KJ_ASSERT(buf[i] == "bar"[i%3]); 67 } 68 } 69 70 KJ_TEST("readiness IO: write even") { 71 auto io = setupAsyncIo(); 72 auto pipe = io.provider->newOneWayPipe(); 73 74 ReadyOutputStreamWrapper out(*pipe.out); 75 76 size_t totalWritten = 0; 77 for (;;) { 78 KJ_IF_MAYBE(n, out.write(kj::StringPtr("ba").asBytes())) { 79 totalWritten += *n; 80 if (*n < 2) { 81 KJ_FAIL_ASSERT("pipe buffer is not divisible by 2? really?"); 82 } 83 } else { 84 break; 85 } 86 } 87 88 auto buf = kj::heapArray<char>(totalWritten + 1); 89 size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope); 90 KJ_ASSERT(n == totalWritten); 91 for (size_t i = 0; i < totalWritten; i++) { 92 KJ_ASSERT(buf[i] == "ba"[i%2]); 93 } 94 } 95 96 KJ_TEST("readiness IO: write while corked") { 97 auto io = setupAsyncIo(); 98 auto pipe = io.provider->newOneWayPipe(); 99 100 char buf[7]; 101 auto readPromise = pipe.in->read(buf, 3, 7); 102 103 ReadyOutputStreamWrapper out(*pipe.out); 104 auto cork = out.cork(); 105 KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("foo").asBytes())) == 3); 106 107 // Data hasn't been written yet. 108 KJ_ASSERT(!readPromise.poll(io.waitScope)); 109 110 // Write some more, and observe it still isn't flushed out yet. 111 KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("bar").asBytes())) == 3); 112 KJ_ASSERT(!readPromise.poll(io.waitScope)); 113 114 // After reenabling pumping, the full read should succeed. 115 // We start this block with `if (true) {` instead of just `{` to avoid g++-8 compiler warnings 116 // telling us that this block isn't treated as part of KJ_ASSERT's internal `for` loop. 117 if (true) { 118 auto tmp = kj::mv(cork); 119 } 120 KJ_ASSERT(readPromise.wait(io.waitScope) == 6); 121 buf[6] = '\0'; 122 KJ_ASSERT(kj::StringPtr(buf) == "foobar"); 123 } 124 125 KJ_TEST("readiness IO: write many odd while corked") { 126 auto io = setupAsyncIo(); 127 auto pipe = io.provider->newOneWayPipe(); 128 129 // The even/odd tests should work just as before even with automatic pumping 130 // corked, since we should still pump when the buffer fills up. 131 ReadyOutputStreamWrapper out(*pipe.out); 132 auto cork = out.cork(); 133 134 size_t totalWritten = 0; 135 for (;;) { 136 KJ_IF_MAYBE(n, out.write(kj::StringPtr("bar").asBytes())) { 137 totalWritten += *n; 138 if (*n < 3) { 139 break; 140 } 141 } else { 142 KJ_FAIL_ASSERT("pipe buffer is divisible by 3? really?"); 143 } 144 } 145 146 auto buf = kj::heapArray<char>(totalWritten + 1); 147 size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope); 148 KJ_ASSERT(n == totalWritten); 149 for (size_t i = 0; i < totalWritten; i++) { 150 KJ_ASSERT(buf[i] == "bar"[i%3]); 151 } 152 153 // Eager pumping should still be corked. 154 KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("bar").asBytes())) == 3); 155 auto readPromise = pipe.in->read(buf.begin(), 3, buf.size()); 156 KJ_ASSERT(!readPromise.poll(io.waitScope)); 157 } 158 159 KJ_TEST("readiness IO: write many even while corked") { 160 auto io = setupAsyncIo(); 161 auto pipe = io.provider->newOneWayPipe(); 162 163 ReadyOutputStreamWrapper out(*pipe.out); 164 auto cork = out.cork(); 165 166 size_t totalWritten = 0; 167 for (;;) { 168 KJ_IF_MAYBE(n, out.write(kj::StringPtr("ba").asBytes())) { 169 totalWritten += *n; 170 if (*n < 2) { 171 KJ_FAIL_ASSERT("pipe buffer is not divisible by 2? really?"); 172 } 173 } else { 174 break; 175 } 176 } 177 178 auto buf = kj::heapArray<char>(totalWritten + 1); 179 size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope); 180 KJ_ASSERT(n == totalWritten); 181 for (size_t i = 0; i < totalWritten; i++) { 182 KJ_ASSERT(buf[i] == "ba"[i%2]); 183 } 184 185 // Eager pumping should still be corked. 186 KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("ba").asBytes())) == 2); 187 auto readPromise = pipe.in->read(buf.begin(), 2, buf.size()); 188 KJ_ASSERT(!readPromise.poll(io.waitScope)); 189 } 190 191 KJ_TEST("readiness IO: read small") { 192 auto io = setupAsyncIo(); 193 auto pipe = io.provider->newOneWayPipe(); 194 195 ReadyInputStreamWrapper in(*pipe.in); 196 char buf[4]; 197 KJ_ASSERT(in.read(kj::ArrayPtr<char>(buf).asBytes()) == nullptr); 198 199 pipe.out->write("foo", 3).wait(io.waitScope); 200 201 in.whenReady().wait(io.waitScope); 202 KJ_ASSERT(KJ_ASSERT_NONNULL(in.read(kj::ArrayPtr<char>(buf).asBytes())) == 3); 203 buf[3] = '\0'; 204 KJ_ASSERT(kj::StringPtr(buf) == "foo"); 205 206 pipe.out = nullptr; 207 208 kj::Maybe<size_t> finalRead; 209 for (;;) { 210 finalRead = in.read(kj::ArrayPtr<char>(buf).asBytes()); 211 KJ_IF_MAYBE(n, finalRead) { 212 KJ_ASSERT(*n == 0); 213 break; 214 } else { 215 in.whenReady().wait(io.waitScope); 216 } 217 } 218 } 219 220 KJ_TEST("readiness IO: read many odd") { 221 auto io = setupAsyncIo(); 222 auto pipe = io.provider->newOneWayPipe(); 223 224 char dummy[8192]; 225 for (auto i: kj::indices(dummy)) { 226 dummy[i] = "bar"[i%3]; 227 } 228 auto writeTask = pipe.out->write(dummy, sizeof(dummy)).then([&]() { 229 // shutdown 230 pipe.out = nullptr; 231 }).eagerlyEvaluate(nullptr); 232 233 ReadyInputStreamWrapper in(*pipe.in); 234 char buf[3]; 235 236 for (;;) { 237 auto result = in.read(kj::ArrayPtr<char>(buf).asBytes()); 238 KJ_IF_MAYBE(n, result) { 239 for (size_t i = 0; i < *n; i++) { 240 KJ_ASSERT(buf[i] == "bar"[i]); 241 } 242 KJ_ASSERT(*n != 0, "ended at wrong spot"); 243 if (*n < 3) { 244 break; 245 } 246 } else { 247 in.whenReady().wait(io.waitScope); 248 } 249 } 250 251 kj::Maybe<size_t> finalRead; 252 for (;;) { 253 finalRead = in.read(kj::ArrayPtr<char>(buf).asBytes()); 254 KJ_IF_MAYBE(n, finalRead) { 255 KJ_ASSERT(*n == 0); 256 break; 257 } else { 258 in.whenReady().wait(io.waitScope); 259 } 260 } 261 } 262 263 KJ_TEST("readiness IO: read many even") { 264 auto io = setupAsyncIo(); 265 auto pipe = io.provider->newOneWayPipe(); 266 267 char dummy[8192]; 268 for (auto i: kj::indices(dummy)) { 269 dummy[i] = "ba"[i%2]; 270 } 271 auto writeTask = pipe.out->write(dummy, sizeof(dummy)).then([&]() { 272 // shutdown 273 pipe.out = nullptr; 274 }).eagerlyEvaluate(nullptr); 275 276 ReadyInputStreamWrapper in(*pipe.in); 277 char buf[2]; 278 279 for (;;) { 280 auto result = in.read(kj::ArrayPtr<char>(buf).asBytes()); 281 KJ_IF_MAYBE(n, result) { 282 for (size_t i = 0; i < *n; i++) { 283 KJ_ASSERT(buf[i] == "ba"[i]); 284 } 285 if (*n == 0) { 286 break; 287 } 288 KJ_ASSERT(*n == 2, "ended at wrong spot"); 289 } else { 290 in.whenReady().wait(io.waitScope); 291 } 292 } 293 294 kj::Maybe<size_t> finalRead; 295 for (;;) { 296 finalRead = in.read(kj::ArrayPtr<char>(buf).asBytes()); 297 KJ_IF_MAYBE(n, finalRead) { 298 KJ_ASSERT(*n == 0); 299 break; 300 } else { 301 in.whenReady().wait(io.waitScope); 302 } 303 } 304 } 305 306 } // namespace 307 } // namespace kj