capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

readiness-io-test.c++ (8861B)


      1 // Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #include "readiness-io.h"
     23 #include <kj/test.h>
     24 #include <stdlib.h>
     25 
     26 namespace kj {
     27 namespace {
     28 
     29 KJ_TEST("readiness IO: write small") {
     30   auto io = setupAsyncIo();
     31   auto pipe = io.provider->newOneWayPipe();
     32 
     33   char buf[4];
     34   auto readPromise = pipe.in->read(buf, 3, 4);
     35 
     36   ReadyOutputStreamWrapper out(*pipe.out);
     37   KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("foo").asBytes())) == 3);
     38 
     39   KJ_ASSERT(readPromise.wait(io.waitScope) == 3);
     40   buf[3] = '\0';
     41   KJ_ASSERT(kj::StringPtr(buf) == "foo");
     42 }
     43 
     44 KJ_TEST("readiness IO: write many odd") {
     45   auto io = setupAsyncIo();
     46   auto pipe = io.provider->newOneWayPipe();
     47 
     48   ReadyOutputStreamWrapper out(*pipe.out);
     49 
     50   size_t totalWritten = 0;
     51   for (;;) {
     52     KJ_IF_MAYBE(n, out.write(kj::StringPtr("bar").asBytes())) {
     53       totalWritten += *n;
     54       if (*n < 3) {
     55         break;
     56       }
     57     } else {
     58       KJ_FAIL_ASSERT("pipe buffer is divisible by 3? really?");
     59     }
     60   }
     61 
     62   auto buf = kj::heapArray<char>(totalWritten + 1);
     63   size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope);
     64   KJ_ASSERT(n == totalWritten);
     65   for (size_t i = 0; i < totalWritten; i++) {
     66     KJ_ASSERT(buf[i] == "bar"[i%3]);
     67   }
     68 }
     69 
     70 KJ_TEST("readiness IO: write even") {
     71   auto io = setupAsyncIo();
     72   auto pipe = io.provider->newOneWayPipe();
     73 
     74   ReadyOutputStreamWrapper out(*pipe.out);
     75 
     76   size_t totalWritten = 0;
     77   for (;;) {
     78     KJ_IF_MAYBE(n, out.write(kj::StringPtr("ba").asBytes())) {
     79       totalWritten += *n;
     80       if (*n < 2) {
     81         KJ_FAIL_ASSERT("pipe buffer is not divisible by 2? really?");
     82       }
     83     } else {
     84       break;
     85     }
     86   }
     87 
     88   auto buf = kj::heapArray<char>(totalWritten + 1);
     89   size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope);
     90   KJ_ASSERT(n == totalWritten);
     91   for (size_t i = 0; i < totalWritten; i++) {
     92     KJ_ASSERT(buf[i] == "ba"[i%2]);
     93   }
     94 }
     95 
     96 KJ_TEST("readiness IO: write while corked") {
     97   auto io = setupAsyncIo();
     98   auto pipe = io.provider->newOneWayPipe();
     99 
    100   char buf[7];
    101   auto readPromise = pipe.in->read(buf, 3, 7);
    102 
    103   ReadyOutputStreamWrapper out(*pipe.out);
    104   auto cork = out.cork();
    105   KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("foo").asBytes())) == 3);
    106 
    107   // Data hasn't been written yet.
    108   KJ_ASSERT(!readPromise.poll(io.waitScope));
    109 
    110   // Write some more, and observe it still isn't flushed out yet.
    111   KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("bar").asBytes())) == 3);
    112   KJ_ASSERT(!readPromise.poll(io.waitScope));
    113 
    114   // After reenabling pumping, the full read should succeed.
    115   // We start this block with `if (true) {` instead of just `{` to avoid g++-8 compiler warnings
    116   // telling us that this block isn't treated as part of KJ_ASSERT's internal `for` loop.
    117   if (true) {
    118     auto tmp = kj::mv(cork);
    119   }
    120   KJ_ASSERT(readPromise.wait(io.waitScope) == 6);
    121   buf[6] = '\0';
    122   KJ_ASSERT(kj::StringPtr(buf) == "foobar");
    123 }
    124 
    125 KJ_TEST("readiness IO: write many odd while corked") {
    126   auto io = setupAsyncIo();
    127   auto pipe = io.provider->newOneWayPipe();
    128 
    129   // The even/odd tests should work just as before even with automatic pumping
    130   // corked, since we should still pump when the buffer fills up.
    131   ReadyOutputStreamWrapper out(*pipe.out);
    132   auto cork = out.cork();
    133 
    134   size_t totalWritten = 0;
    135   for (;;) {
    136     KJ_IF_MAYBE(n, out.write(kj::StringPtr("bar").asBytes())) {
    137       totalWritten += *n;
    138       if (*n < 3) {
    139         break;
    140       }
    141     } else {
    142       KJ_FAIL_ASSERT("pipe buffer is divisible by 3? really?");
    143     }
    144   }
    145 
    146   auto buf = kj::heapArray<char>(totalWritten + 1);
    147   size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope);
    148   KJ_ASSERT(n == totalWritten);
    149   for (size_t i = 0; i < totalWritten; i++) {
    150     KJ_ASSERT(buf[i] == "bar"[i%3]);
    151   }
    152 
    153   // Eager pumping should still be corked.
    154   KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("bar").asBytes())) == 3);
    155   auto readPromise = pipe.in->read(buf.begin(), 3, buf.size());
    156   KJ_ASSERT(!readPromise.poll(io.waitScope));
    157 }
    158 
    159 KJ_TEST("readiness IO: write many even while corked") {
    160   auto io = setupAsyncIo();
    161   auto pipe = io.provider->newOneWayPipe();
    162 
    163   ReadyOutputStreamWrapper out(*pipe.out);
    164   auto cork = out.cork();
    165 
    166   size_t totalWritten = 0;
    167   for (;;) {
    168     KJ_IF_MAYBE(n, out.write(kj::StringPtr("ba").asBytes())) {
    169       totalWritten += *n;
    170       if (*n < 2) {
    171         KJ_FAIL_ASSERT("pipe buffer is not divisible by 2? really?");
    172       }
    173     } else {
    174       break;
    175     }
    176   }
    177 
    178   auto buf = kj::heapArray<char>(totalWritten + 1);
    179   size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope);
    180   KJ_ASSERT(n == totalWritten);
    181   for (size_t i = 0; i < totalWritten; i++) {
    182     KJ_ASSERT(buf[i] == "ba"[i%2]);
    183   }
    184 
    185   // Eager pumping should still be corked.
    186   KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("ba").asBytes())) == 2);
    187   auto readPromise = pipe.in->read(buf.begin(), 2, buf.size());
    188   KJ_ASSERT(!readPromise.poll(io.waitScope));
    189 }
    190 
    191 KJ_TEST("readiness IO: read small") {
    192   auto io = setupAsyncIo();
    193   auto pipe = io.provider->newOneWayPipe();
    194 
    195   ReadyInputStreamWrapper in(*pipe.in);
    196   char buf[4];
    197   KJ_ASSERT(in.read(kj::ArrayPtr<char>(buf).asBytes()) == nullptr);
    198 
    199   pipe.out->write("foo", 3).wait(io.waitScope);
    200 
    201   in.whenReady().wait(io.waitScope);
    202   KJ_ASSERT(KJ_ASSERT_NONNULL(in.read(kj::ArrayPtr<char>(buf).asBytes())) == 3);
    203   buf[3] = '\0';
    204   KJ_ASSERT(kj::StringPtr(buf) == "foo");
    205 
    206   pipe.out = nullptr;
    207 
    208   kj::Maybe<size_t> finalRead;
    209   for (;;) {
    210     finalRead = in.read(kj::ArrayPtr<char>(buf).asBytes());
    211     KJ_IF_MAYBE(n, finalRead) {
    212       KJ_ASSERT(*n == 0);
    213       break;
    214     } else {
    215       in.whenReady().wait(io.waitScope);
    216     }
    217   }
    218 }
    219 
    220 KJ_TEST("readiness IO: read many odd") {
    221   auto io = setupAsyncIo();
    222   auto pipe = io.provider->newOneWayPipe();
    223 
    224   char dummy[8192];
    225   for (auto i: kj::indices(dummy)) {
    226     dummy[i] = "bar"[i%3];
    227   }
    228   auto writeTask = pipe.out->write(dummy, sizeof(dummy)).then([&]() {
    229     // shutdown
    230     pipe.out = nullptr;
    231   }).eagerlyEvaluate(nullptr);
    232 
    233   ReadyInputStreamWrapper in(*pipe.in);
    234   char buf[3];
    235 
    236   for (;;) {
    237     auto result = in.read(kj::ArrayPtr<char>(buf).asBytes());
    238     KJ_IF_MAYBE(n, result) {
    239       for (size_t i = 0; i < *n; i++) {
    240         KJ_ASSERT(buf[i] == "bar"[i]);
    241       }
    242       KJ_ASSERT(*n != 0, "ended at wrong spot");
    243       if (*n < 3) {
    244         break;
    245       }
    246     } else {
    247       in.whenReady().wait(io.waitScope);
    248     }
    249   }
    250 
    251   kj::Maybe<size_t> finalRead;
    252   for (;;) {
    253     finalRead = in.read(kj::ArrayPtr<char>(buf).asBytes());
    254     KJ_IF_MAYBE(n, finalRead) {
    255       KJ_ASSERT(*n == 0);
    256       break;
    257     } else {
    258       in.whenReady().wait(io.waitScope);
    259     }
    260   }
    261 }
    262 
    263 KJ_TEST("readiness IO: read many even") {
    264   auto io = setupAsyncIo();
    265   auto pipe = io.provider->newOneWayPipe();
    266 
    267   char dummy[8192];
    268   for (auto i: kj::indices(dummy)) {
    269     dummy[i] = "ba"[i%2];
    270   }
    271   auto writeTask = pipe.out->write(dummy, sizeof(dummy)).then([&]() {
    272     // shutdown
    273     pipe.out = nullptr;
    274   }).eagerlyEvaluate(nullptr);
    275 
    276   ReadyInputStreamWrapper in(*pipe.in);
    277   char buf[2];
    278 
    279   for (;;) {
    280     auto result = in.read(kj::ArrayPtr<char>(buf).asBytes());
    281     KJ_IF_MAYBE(n, result) {
    282       for (size_t i = 0; i < *n; i++) {
    283         KJ_ASSERT(buf[i] == "ba"[i]);
    284       }
    285       if (*n == 0) {
    286         break;
    287       }
    288       KJ_ASSERT(*n == 2, "ended at wrong spot");
    289     } else {
    290       in.whenReady().wait(io.waitScope);
    291     }
    292   }
    293 
    294   kj::Maybe<size_t> finalRead;
    295   for (;;) {
    296     finalRead = in.read(kj::ArrayPtr<char>(buf).asBytes());
    297     KJ_IF_MAYBE(n, finalRead) {
    298       KJ_ASSERT(*n == 0);
    299       break;
    300     } else {
    301       in.whenReady().wait(io.waitScope);
    302     }
    303   }
    304 }
    305 
    306 }  // namespace
    307 }  // namespace kj