capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

io.c++ (14639B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #ifndef _GNU_SOURCE
     23 #define _GNU_SOURCE
     24 #endif
     25 
     26 #if _WIN32
     27 #include "win32-api-version.h"
     28 #endif
     29 
     30 #include "io.h"
     31 #include "debug.h"
     32 #include "miniposix.h"
     33 #include <algorithm>
     34 #include <errno.h>
     35 #include "vector.h"
     36 
     37 #if _WIN32
     38 #include <windows.h>
     39 #include "windows-sanity.h"
     40 #else
     41 #include <sys/uio.h>
     42 #endif
     43 
     44 namespace kj {
     45 
     46 InputStream::~InputStream() noexcept(false) {}
     47 OutputStream::~OutputStream() noexcept(false) {}
     48 BufferedInputStream::~BufferedInputStream() noexcept(false) {}
     49 BufferedOutputStream::~BufferedOutputStream() noexcept(false) {}
     50 
     51 size_t InputStream::read(void* buffer, size_t minBytes, size_t maxBytes) {
     52   size_t n = tryRead(buffer, minBytes, maxBytes);
     53   KJ_REQUIRE(n >= minBytes, "Premature EOF") {
     54     // Pretend we read zeros from the input.
     55     memset(reinterpret_cast<byte*>(buffer) + n, 0, minBytes - n);
     56     return minBytes;
     57   }
     58   return n;
     59 }
     60 
     61 void InputStream::skip(size_t bytes) {
     62   char scratch[8192];
     63   while (bytes > 0) {
     64     size_t amount = std::min(bytes, sizeof(scratch));
     65     read(scratch, amount);
     66     bytes -= amount;
     67   }
     68 }
     69 
     70 
     71 namespace {
     72 
     73 Array<byte> readAll(InputStream& input, uint64_t limit, bool nulTerminate) {
     74   Vector<Array<byte>> parts;
     75   constexpr size_t BLOCK_SIZE = 4096;
     76 
     77   for (;;) {
     78     KJ_REQUIRE(limit > 0, "Reached limit before EOF.");
     79     auto part = heapArray<byte>(kj::min(BLOCK_SIZE, limit));
     80     size_t n = input.tryRead(part.begin(), part.size(), part.size());
     81     limit -= n;
     82     if (n < part.size()) {
     83       auto result = heapArray<byte>(parts.size() * BLOCK_SIZE + n + nulTerminate);
     84       byte* pos = result.begin();
     85       for (auto& p: parts) {
     86         memcpy(pos, p.begin(), BLOCK_SIZE);
     87         pos += BLOCK_SIZE;
     88       }
     89       memcpy(pos, part.begin(), n);
     90       pos += n;
     91       if (nulTerminate) *pos++ = '\0';
     92       KJ_ASSERT(pos == result.end());
     93       return result;
     94     } else {
     95       parts.add(kj::mv(part));
     96     }
     97   }
     98 }
     99 
    100 }  // namespace
    101 
    102 String InputStream::readAllText(uint64_t limit) {
    103   return String(readAll(*this, limit, true).releaseAsChars());
    104 }
    105 Array<byte> InputStream::readAllBytes(uint64_t limit) {
    106   return readAll(*this, limit, false);
    107 }
    108 
    109 void OutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
    110   for (auto piece: pieces) {
    111     write(piece.begin(), piece.size());
    112   }
    113 }
    114 
    115 ArrayPtr<const byte> BufferedInputStream::getReadBuffer() {
    116   auto result = tryGetReadBuffer();
    117   KJ_REQUIRE(result.size() > 0, "Premature EOF");
    118   return result;
    119 }
    120 
    121 // =======================================================================================
    122 
    123 BufferedInputStreamWrapper::BufferedInputStreamWrapper(InputStream& inner, ArrayPtr<byte> buffer)
    124     : inner(inner), ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr),
    125       buffer(buffer == nullptr ? ownedBuffer : buffer) {}
    126 
    127 BufferedInputStreamWrapper::~BufferedInputStreamWrapper() noexcept(false) {}
    128 
    129 ArrayPtr<const byte> BufferedInputStreamWrapper::tryGetReadBuffer() {
    130   if (bufferAvailable.size() == 0) {
    131     size_t n = inner.tryRead(buffer.begin(), 1, buffer.size());
    132     bufferAvailable = buffer.slice(0, n);
    133   }
    134 
    135   return bufferAvailable;
    136 }
    137 
    138 size_t BufferedInputStreamWrapper::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
    139   if (minBytes <= bufferAvailable.size()) {
    140     // Serve from current buffer.
    141     size_t n = std::min(bufferAvailable.size(), maxBytes);
    142     memcpy(dst, bufferAvailable.begin(), n);
    143     bufferAvailable = bufferAvailable.slice(n, bufferAvailable.size());
    144     return n;
    145   } else {
    146     // Copy current available into destination.
    147     memcpy(dst, bufferAvailable.begin(), bufferAvailable.size());
    148     size_t fromFirstBuffer = bufferAvailable.size();
    149 
    150     dst = reinterpret_cast<byte*>(dst) + fromFirstBuffer;
    151     minBytes -= fromFirstBuffer;
    152     maxBytes -= fromFirstBuffer;
    153 
    154     if (maxBytes <= buffer.size()) {
    155       // Read the next buffer-full.
    156       size_t n = inner.read(buffer.begin(), minBytes, buffer.size());
    157       size_t fromSecondBuffer = std::min(n, maxBytes);
    158       memcpy(dst, buffer.begin(), fromSecondBuffer);
    159       bufferAvailable = buffer.slice(fromSecondBuffer, n);
    160       return fromFirstBuffer + fromSecondBuffer;
    161     } else {
    162       // Forward large read to the underlying stream.
    163       bufferAvailable = nullptr;
    164       return fromFirstBuffer + inner.read(dst, minBytes, maxBytes);
    165     }
    166   }
    167 }
    168 
    169 void BufferedInputStreamWrapper::skip(size_t bytes) {
    170   if (bytes <= bufferAvailable.size()) {
    171     bufferAvailable = bufferAvailable.slice(bytes, bufferAvailable.size());
    172   } else {
    173     bytes -= bufferAvailable.size();
    174     if (bytes <= buffer.size()) {
    175       // Read the next buffer-full.
    176       size_t n = inner.read(buffer.begin(), bytes, buffer.size());
    177       bufferAvailable = buffer.slice(bytes, n);
    178     } else {
    179       // Forward large skip to the underlying stream.
    180       bufferAvailable = nullptr;
    181       inner.skip(bytes);
    182     }
    183   }
    184 }
    185 
    186 // -------------------------------------------------------------------
    187 
    188 BufferedOutputStreamWrapper::BufferedOutputStreamWrapper(OutputStream& inner, ArrayPtr<byte> buffer)
    189     : inner(inner),
    190       ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr),
    191       buffer(buffer == nullptr ? ownedBuffer : buffer),
    192       bufferPos(this->buffer.begin()) {}
    193 
    194 BufferedOutputStreamWrapper::~BufferedOutputStreamWrapper() noexcept(false) {
    195   unwindDetector.catchExceptionsIfUnwinding([&]() {
    196     flush();
    197   });
    198 }
    199 
    200 void BufferedOutputStreamWrapper::flush() {
    201   if (bufferPos > buffer.begin()) {
    202     inner.write(buffer.begin(), bufferPos - buffer.begin());
    203     bufferPos = buffer.begin();
    204   }
    205 }
    206 
    207 ArrayPtr<byte> BufferedOutputStreamWrapper::getWriteBuffer() {
    208   return arrayPtr(bufferPos, buffer.end());
    209 }
    210 
    211 void BufferedOutputStreamWrapper::write(const void* src, size_t size) {
    212   if (src == bufferPos) {
    213     // Oh goody, the caller wrote directly into our buffer.
    214     bufferPos += size;
    215   } else {
    216     size_t available = buffer.end() - bufferPos;
    217 
    218     if (size <= available) {
    219       memcpy(bufferPos, src, size);
    220       bufferPos += size;
    221     } else if (size <= buffer.size()) {
    222       // Too much for this buffer, but not a full buffer's worth, so we'll go ahead and copy.
    223       memcpy(bufferPos, src, available);
    224       inner.write(buffer.begin(), buffer.size());
    225 
    226       size -= available;
    227       src = reinterpret_cast<const byte*>(src) + available;
    228 
    229       memcpy(buffer.begin(), src, size);
    230       bufferPos = buffer.begin() + size;
    231     } else {
    232       // Writing so much data that we might as well write directly to avoid a copy.
    233       inner.write(buffer.begin(), bufferPos - buffer.begin());
    234       bufferPos = buffer.begin();
    235       inner.write(src, size);
    236     }
    237   }
    238 }
    239 
    240 // =======================================================================================
    241 
    242 ArrayInputStream::ArrayInputStream(ArrayPtr<const byte> array): array(array) {}
    243 ArrayInputStream::~ArrayInputStream() noexcept(false) {}
    244 
    245 ArrayPtr<const byte> ArrayInputStream::tryGetReadBuffer() {
    246   return array;
    247 }
    248 
    249 size_t ArrayInputStream::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
    250   size_t n = std::min(maxBytes, array.size());
    251   memcpy(dst, array.begin(), n);
    252   array = array.slice(n, array.size());
    253   return n;
    254 }
    255 
    256 void ArrayInputStream::skip(size_t bytes) {
    257   KJ_REQUIRE(array.size() >= bytes, "ArrayInputStream ended prematurely.") {
    258     bytes = array.size();
    259     break;
    260   }
    261   array = array.slice(bytes, array.size());
    262 }
    263 
    264 // -------------------------------------------------------------------
    265 
    266 ArrayOutputStream::ArrayOutputStream(ArrayPtr<byte> array): array(array), fillPos(array.begin()) {}
    267 ArrayOutputStream::~ArrayOutputStream() noexcept(false) {}
    268 
    269 ArrayPtr<byte> ArrayOutputStream::getWriteBuffer() {
    270   return arrayPtr(fillPos, array.end());
    271 }
    272 
    273 void ArrayOutputStream::write(const void* src, size_t size) {
    274   if (src == fillPos && fillPos != array.end()) {
    275     // Oh goody, the caller wrote directly into our buffer.
    276     KJ_REQUIRE(size <= array.end() - fillPos, size, fillPos, array.end() - fillPos);
    277     fillPos += size;
    278   } else {
    279     KJ_REQUIRE(size <= (size_t)(array.end() - fillPos),
    280             "ArrayOutputStream's backing array was not large enough for the data written.");
    281     memcpy(fillPos, src, size);
    282     fillPos += size;
    283   }
    284 }
    285 
    286 // -------------------------------------------------------------------
    287 
    288 VectorOutputStream::VectorOutputStream(size_t initialCapacity)
    289     : vector(heapArray<byte>(initialCapacity)), fillPos(vector.begin()) {}
    290 VectorOutputStream::~VectorOutputStream() noexcept(false) {}
    291 
    292 ArrayPtr<byte> VectorOutputStream::getWriteBuffer() {
    293   // Grow if needed.
    294   if (fillPos == vector.end()) {
    295     grow(vector.size() + 1);
    296   }
    297 
    298   return arrayPtr(fillPos, vector.end());
    299 }
    300 
    301 void VectorOutputStream::write(const void* src, size_t size) {
    302   if (src == fillPos && fillPos != vector.end()) {
    303     // Oh goody, the caller wrote directly into our buffer.
    304     KJ_REQUIRE(size <= vector.end() - fillPos, size, fillPos, vector.end() - fillPos);
    305     fillPos += size;
    306   } else {
    307     if (vector.end() - fillPos < size) {
    308       grow(fillPos - vector.begin() + size);
    309     }
    310 
    311     memcpy(fillPos, src, size);
    312     fillPos += size;
    313   }
    314 }
    315 
    316 void VectorOutputStream::grow(size_t minSize) {
    317   size_t newSize = vector.size() * 2;
    318   while (newSize < minSize) newSize *= 2;
    319   auto newVector = heapArray<byte>(newSize);
    320   memcpy(newVector.begin(), vector.begin(), fillPos - vector.begin());
    321   fillPos = fillPos - vector.begin() + newVector.begin();
    322   vector = kj::mv(newVector);
    323 }
    324 
    325 // =======================================================================================
    326 
    327 AutoCloseFd::~AutoCloseFd() noexcept(false) {
    328   if (fd >= 0) {
    329     // Don't use SYSCALL() here because close() should not be repeated on EINTR.
    330     if (miniposix::close(fd) < 0) {
    331       KJ_FAIL_SYSCALL("close", errno, fd) {
    332         // This ensures we don't throw an exception if unwinding.
    333         break;
    334       }
    335     }
    336   }
    337 }
    338 
    339 FdInputStream::~FdInputStream() noexcept(false) {}
    340 
    341 size_t FdInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
    342   byte* pos = reinterpret_cast<byte*>(buffer);
    343   byte* min = pos + minBytes;
    344   byte* max = pos + maxBytes;
    345 
    346   while (pos < min) {
    347     miniposix::ssize_t n;
    348     KJ_SYSCALL(n = miniposix::read(fd, pos, max - pos), fd);
    349     if (n == 0) {
    350       break;
    351     }
    352     pos += n;
    353   }
    354 
    355   return pos - reinterpret_cast<byte*>(buffer);
    356 }
    357 
    358 FdOutputStream::~FdOutputStream() noexcept(false) {}
    359 
    360 void FdOutputStream::write(const void* buffer, size_t size) {
    361   const char* pos = reinterpret_cast<const char*>(buffer);
    362 
    363   while (size > 0) {
    364     miniposix::ssize_t n;
    365     KJ_SYSCALL(n = miniposix::write(fd, pos, size), fd);
    366     KJ_ASSERT(n > 0, "write() returned zero.");
    367     pos += n;
    368     size -= n;
    369   }
    370 }
    371 
    372 void FdOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
    373 #if _WIN32
    374   // Windows has no reasonable writev(). It has WriteFileGather, but this call has the unreasonable
    375   // restriction that each segment must be page-aligned. So, fall back to the default implementation
    376 
    377   OutputStream::write(pieces);
    378 
    379 #else
    380   const size_t iovmax = miniposix::iovMax();
    381   while (pieces.size() > iovmax) {
    382     write(pieces.slice(0, iovmax));
    383     pieces = pieces.slice(iovmax, pieces.size());
    384   }
    385 
    386   KJ_STACK_ARRAY(struct iovec, iov, pieces.size(), 16, 128);
    387 
    388   for (uint i = 0; i < pieces.size(); i++) {
    389     // writev() interface is not const-correct.  :(
    390     iov[i].iov_base = const_cast<byte*>(pieces[i].begin());
    391     iov[i].iov_len = pieces[i].size();
    392   }
    393 
    394   struct iovec* current = iov.begin();
    395 
    396   // Advance past any leading empty buffers so that a write full of only empty buffers does not
    397   // cause a syscall at all.
    398   while (current < iov.end() && current->iov_len == 0) {
    399     ++current;
    400   }
    401 
    402   while (current < iov.end()) {
    403     // Issue the write.
    404     ssize_t n = 0;
    405     KJ_SYSCALL(n = ::writev(fd, current, iov.end() - current), fd);
    406     KJ_ASSERT(n > 0, "writev() returned zero.");
    407 
    408     // Advance past all buffers that were fully-written.
    409     while (current < iov.end() && static_cast<size_t>(n) >= current->iov_len) {
    410       n -= current->iov_len;
    411       ++current;
    412     }
    413 
    414     // If we only partially-wrote one of the buffers, adjust the pointer and size to include only
    415     // the unwritten part.
    416     if (n > 0) {
    417       current->iov_base = reinterpret_cast<byte*>(current->iov_base) + n;
    418       current->iov_len -= n;
    419     }
    420   }
    421 #endif
    422 }
    423 
    424 // =======================================================================================
    425 
    426 #if _WIN32
    427 
    428 AutoCloseHandle::~AutoCloseHandle() noexcept(false) {
    429   if (handle != (void*)-1) {
    430     KJ_WIN32(CloseHandle(handle));
    431   }
    432 }
    433 
    434 HandleInputStream::~HandleInputStream() noexcept(false) {}
    435 
    436 size_t HandleInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
    437   byte* pos = reinterpret_cast<byte*>(buffer);
    438   byte* min = pos + minBytes;
    439   byte* max = pos + maxBytes;
    440 
    441   while (pos < min) {
    442     DWORD n;
    443     KJ_WIN32(ReadFile(handle, pos, kj::min(max - pos, DWORD(kj::maxValue)), &n, nullptr));
    444     if (n == 0) {
    445       break;
    446     }
    447     pos += n;
    448   }
    449 
    450   return pos - reinterpret_cast<byte*>(buffer);
    451 }
    452 
    453 HandleOutputStream::~HandleOutputStream() noexcept(false) {}
    454 
    455 void HandleOutputStream::write(const void* buffer, size_t size) {
    456   const char* pos = reinterpret_cast<const char*>(buffer);
    457 
    458   while (size > 0) {
    459     DWORD n;
    460     KJ_WIN32(WriteFile(handle, pos, kj::min(size, DWORD(kj::maxValue)), &n, nullptr));
    461     KJ_ASSERT(n > 0, "write() returned zero.");
    462     pos += n;
    463     size -= n;
    464   }
    465 }
    466 
    467 #endif  // _WIN32
    468 
    469 }  // namespace kj