capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

readiness-io.c++ (5022B)


      1 // Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #include "readiness-io.h"
     23 
     24 namespace kj {
     25 
     26 static size_t copyInto(kj::ArrayPtr<byte> dst, kj::ArrayPtr<const byte>& src) {
     27   size_t n = kj::min(dst.size(), src.size());
     28   memcpy(dst.begin(), src.begin(), n);
     29   src = src.slice(n, src.size());
     30   return n;
     31 }
     32 
     33 // =======================================================================================
     34 
     35 ReadyInputStreamWrapper::ReadyInputStreamWrapper(AsyncInputStream& input): input(input) {}
     36 ReadyInputStreamWrapper::~ReadyInputStreamWrapper() noexcept(false) {}
     37 
     38 kj::Maybe<size_t> ReadyInputStreamWrapper::read(kj::ArrayPtr<byte> dst) {
     39   if (eof || dst.size() == 0) return size_t(0);
     40 
     41   if (content.size() == 0) {
     42     // No data available. Try to read more.
     43     if (!isPumping) {
     44       isPumping = true;
     45       pumpTask = kj::evalNow([&]() {
     46         return input.tryRead(buffer, 1, sizeof(buffer)).then([this](size_t n) {
     47           if (n == 0) {
     48             eof = true;
     49           } else {
     50             content = kj::arrayPtr(buffer, n);
     51           }
     52           isPumping = false;
     53         });
     54       }).fork();
     55     }
     56 
     57     return nullptr;
     58   }
     59 
     60   return copyInto(dst, content);
     61 }
     62 
     63 kj::Promise<void> ReadyInputStreamWrapper::whenReady() {
     64   return pumpTask.addBranch();
     65 }
     66 
     67 // =======================================================================================
     68 
     69 ReadyOutputStreamWrapper::ReadyOutputStreamWrapper(AsyncOutputStream& output): output(output) {}
     70 ReadyOutputStreamWrapper::~ReadyOutputStreamWrapper() noexcept(false) {}
     71 
     72 kj::Maybe<size_t> ReadyOutputStreamWrapper::write(kj::ArrayPtr<const byte> data) {
     73   if (data.size() == 0) return size_t(0);
     74 
     75   if (filled == sizeof(buffer)) {
     76     // No space.
     77     return nullptr;
     78   }
     79 
     80   uint end = start + filled;
     81   size_t result = 0;
     82   if (end < sizeof(buffer)) {
     83     // The filled part of the buffer is somewhere in the middle.
     84 
     85     // Copy into space after filled space.
     86     result += copyInto(kj::arrayPtr(buffer + end, buffer + sizeof(buffer)), data);
     87 
     88     // Copy into space before filled space.
     89     result += copyInto(kj::arrayPtr(buffer, buffer + start), data);
     90   } else {
     91     // Fill currently loops, so we only have one segment of empty space to copy into.
     92 
     93     // Copy into the space between the fill's end and the fill's start.
     94     result += copyInto(kj::arrayPtr(buffer + end % sizeof(buffer), buffer + start), data);
     95   }
     96 
     97   filled += result;
     98 
     99   if (!isPumping && (!corked || filled == sizeof(buffer))) {
    100     isPumping = true;
    101     pumpTask = kj::evalNow([&]() {
    102       return pump();
    103     }).fork();
    104   }
    105 
    106   return result;
    107 }
    108 
    109 kj::Promise<void> ReadyOutputStreamWrapper::whenReady() {
    110   return pumpTask.addBranch();
    111 }
    112 
    113 ReadyOutputStreamWrapper::Cork ReadyOutputStreamWrapper::cork() {
    114   corked = true;
    115   return Cork(*this);
    116 }
    117 
    118 void ReadyOutputStreamWrapper::uncork() {
    119   corked = false;
    120   if (!isPumping && filled > 0) {
    121     isPumping = true;
    122     pumpTask = kj::evalNow([&]() {
    123       return pump();
    124     }).fork();
    125   }
    126 }
    127 
    128 kj::Promise<void> ReadyOutputStreamWrapper::pump() {
    129   uint oldFilled = filled;
    130   uint end = start + filled;
    131 
    132   kj::Promise<void> promise = nullptr;
    133   if (end <= sizeof(buffer)) {
    134     promise = output.write(buffer + start, filled);
    135   } else {
    136     end = end % sizeof(buffer);
    137     segments[0] = kj::arrayPtr(buffer + start, buffer + sizeof(buffer));
    138     segments[1] = kj::arrayPtr(buffer, buffer + end);
    139     promise = output.write(segments);
    140   }
    141 
    142   return promise.then([this,oldFilled,end]() -> kj::Promise<void> {
    143     filled -= oldFilled;
    144     start = end;
    145 
    146     if (filled > 0) {
    147       return pump();
    148     } else {
    149       isPumping = false;
    150       // As a small optimization, reset to the start of the buffer when it's empty so we can provide
    151       // the underlying layer just one contiguous chunk of memory instead of two when possible.
    152       start = 0;
    153       return kj::READY_NOW;
    154     }
    155   });
    156 }
    157 
    158 }  // namespace kj
    159