io.c++ (14639B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #ifndef _GNU_SOURCE 23 #define _GNU_SOURCE 24 #endif 25 26 #if _WIN32 27 #include "win32-api-version.h" 28 #endif 29 30 #include "io.h" 31 #include "debug.h" 32 #include "miniposix.h" 33 #include <algorithm> 34 #include <errno.h> 35 #include "vector.h" 36 37 #if _WIN32 38 #include <windows.h> 39 #include "windows-sanity.h" 40 #else 41 #include <sys/uio.h> 42 #endif 43 44 namespace kj { 45 46 InputStream::~InputStream() noexcept(false) {} 47 OutputStream::~OutputStream() noexcept(false) {} 48 BufferedInputStream::~BufferedInputStream() noexcept(false) {} 49 BufferedOutputStream::~BufferedOutputStream() noexcept(false) {} 50 51 size_t InputStream::read(void* buffer, size_t minBytes, size_t maxBytes) { 52 size_t n = tryRead(buffer, minBytes, maxBytes); 53 KJ_REQUIRE(n >= minBytes, "Premature EOF") { 54 // Pretend we read zeros from the input. 55 memset(reinterpret_cast<byte*>(buffer) + n, 0, minBytes - n); 56 return minBytes; 57 } 58 return n; 59 } 60 61 void InputStream::skip(size_t bytes) { 62 char scratch[8192]; 63 while (bytes > 0) { 64 size_t amount = std::min(bytes, sizeof(scratch)); 65 read(scratch, amount); 66 bytes -= amount; 67 } 68 } 69 70 71 namespace { 72 73 Array<byte> readAll(InputStream& input, uint64_t limit, bool nulTerminate) { 74 Vector<Array<byte>> parts; 75 constexpr size_t BLOCK_SIZE = 4096; 76 77 for (;;) { 78 KJ_REQUIRE(limit > 0, "Reached limit before EOF."); 79 auto part = heapArray<byte>(kj::min(BLOCK_SIZE, limit)); 80 size_t n = input.tryRead(part.begin(), part.size(), part.size()); 81 limit -= n; 82 if (n < part.size()) { 83 auto result = heapArray<byte>(parts.size() * BLOCK_SIZE + n + nulTerminate); 84 byte* pos = result.begin(); 85 for (auto& p: parts) { 86 memcpy(pos, p.begin(), BLOCK_SIZE); 87 pos += BLOCK_SIZE; 88 } 89 memcpy(pos, part.begin(), n); 90 pos += n; 91 if (nulTerminate) *pos++ = '\0'; 92 KJ_ASSERT(pos == result.end()); 93 return result; 94 } else { 95 parts.add(kj::mv(part)); 96 } 97 } 98 } 99 100 } // namespace 101 102 String InputStream::readAllText(uint64_t limit) { 103 return String(readAll(*this, limit, true).releaseAsChars()); 104 } 105 Array<byte> InputStream::readAllBytes(uint64_t limit) { 106 return readAll(*this, limit, false); 107 } 108 109 void OutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) { 110 for (auto piece: pieces) { 111 write(piece.begin(), piece.size()); 112 } 113 } 114 115 ArrayPtr<const byte> BufferedInputStream::getReadBuffer() { 116 auto result = tryGetReadBuffer(); 117 KJ_REQUIRE(result.size() > 0, "Premature EOF"); 118 return result; 119 } 120 121 // ======================================================================================= 122 123 BufferedInputStreamWrapper::BufferedInputStreamWrapper(InputStream& inner, ArrayPtr<byte> buffer) 124 : inner(inner), ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr), 125 buffer(buffer == nullptr ? ownedBuffer : buffer) {} 126 127 BufferedInputStreamWrapper::~BufferedInputStreamWrapper() noexcept(false) {} 128 129 ArrayPtr<const byte> BufferedInputStreamWrapper::tryGetReadBuffer() { 130 if (bufferAvailable.size() == 0) { 131 size_t n = inner.tryRead(buffer.begin(), 1, buffer.size()); 132 bufferAvailable = buffer.slice(0, n); 133 } 134 135 return bufferAvailable; 136 } 137 138 size_t BufferedInputStreamWrapper::tryRead(void* dst, size_t minBytes, size_t maxBytes) { 139 if (minBytes <= bufferAvailable.size()) { 140 // Serve from current buffer. 141 size_t n = std::min(bufferAvailable.size(), maxBytes); 142 memcpy(dst, bufferAvailable.begin(), n); 143 bufferAvailable = bufferAvailable.slice(n, bufferAvailable.size()); 144 return n; 145 } else { 146 // Copy current available into destination. 147 memcpy(dst, bufferAvailable.begin(), bufferAvailable.size()); 148 size_t fromFirstBuffer = bufferAvailable.size(); 149 150 dst = reinterpret_cast<byte*>(dst) + fromFirstBuffer; 151 minBytes -= fromFirstBuffer; 152 maxBytes -= fromFirstBuffer; 153 154 if (maxBytes <= buffer.size()) { 155 // Read the next buffer-full. 156 size_t n = inner.read(buffer.begin(), minBytes, buffer.size()); 157 size_t fromSecondBuffer = std::min(n, maxBytes); 158 memcpy(dst, buffer.begin(), fromSecondBuffer); 159 bufferAvailable = buffer.slice(fromSecondBuffer, n); 160 return fromFirstBuffer + fromSecondBuffer; 161 } else { 162 // Forward large read to the underlying stream. 163 bufferAvailable = nullptr; 164 return fromFirstBuffer + inner.read(dst, minBytes, maxBytes); 165 } 166 } 167 } 168 169 void BufferedInputStreamWrapper::skip(size_t bytes) { 170 if (bytes <= bufferAvailable.size()) { 171 bufferAvailable = bufferAvailable.slice(bytes, bufferAvailable.size()); 172 } else { 173 bytes -= bufferAvailable.size(); 174 if (bytes <= buffer.size()) { 175 // Read the next buffer-full. 176 size_t n = inner.read(buffer.begin(), bytes, buffer.size()); 177 bufferAvailable = buffer.slice(bytes, n); 178 } else { 179 // Forward large skip to the underlying stream. 180 bufferAvailable = nullptr; 181 inner.skip(bytes); 182 } 183 } 184 } 185 186 // ------------------------------------------------------------------- 187 188 BufferedOutputStreamWrapper::BufferedOutputStreamWrapper(OutputStream& inner, ArrayPtr<byte> buffer) 189 : inner(inner), 190 ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr), 191 buffer(buffer == nullptr ? ownedBuffer : buffer), 192 bufferPos(this->buffer.begin()) {} 193 194 BufferedOutputStreamWrapper::~BufferedOutputStreamWrapper() noexcept(false) { 195 unwindDetector.catchExceptionsIfUnwinding([&]() { 196 flush(); 197 }); 198 } 199 200 void BufferedOutputStreamWrapper::flush() { 201 if (bufferPos > buffer.begin()) { 202 inner.write(buffer.begin(), bufferPos - buffer.begin()); 203 bufferPos = buffer.begin(); 204 } 205 } 206 207 ArrayPtr<byte> BufferedOutputStreamWrapper::getWriteBuffer() { 208 return arrayPtr(bufferPos, buffer.end()); 209 } 210 211 void BufferedOutputStreamWrapper::write(const void* src, size_t size) { 212 if (src == bufferPos) { 213 // Oh goody, the caller wrote directly into our buffer. 214 bufferPos += size; 215 } else { 216 size_t available = buffer.end() - bufferPos; 217 218 if (size <= available) { 219 memcpy(bufferPos, src, size); 220 bufferPos += size; 221 } else if (size <= buffer.size()) { 222 // Too much for this buffer, but not a full buffer's worth, so we'll go ahead and copy. 223 memcpy(bufferPos, src, available); 224 inner.write(buffer.begin(), buffer.size()); 225 226 size -= available; 227 src = reinterpret_cast<const byte*>(src) + available; 228 229 memcpy(buffer.begin(), src, size); 230 bufferPos = buffer.begin() + size; 231 } else { 232 // Writing so much data that we might as well write directly to avoid a copy. 233 inner.write(buffer.begin(), bufferPos - buffer.begin()); 234 bufferPos = buffer.begin(); 235 inner.write(src, size); 236 } 237 } 238 } 239 240 // ======================================================================================= 241 242 ArrayInputStream::ArrayInputStream(ArrayPtr<const byte> array): array(array) {} 243 ArrayInputStream::~ArrayInputStream() noexcept(false) {} 244 245 ArrayPtr<const byte> ArrayInputStream::tryGetReadBuffer() { 246 return array; 247 } 248 249 size_t ArrayInputStream::tryRead(void* dst, size_t minBytes, size_t maxBytes) { 250 size_t n = std::min(maxBytes, array.size()); 251 memcpy(dst, array.begin(), n); 252 array = array.slice(n, array.size()); 253 return n; 254 } 255 256 void ArrayInputStream::skip(size_t bytes) { 257 KJ_REQUIRE(array.size() >= bytes, "ArrayInputStream ended prematurely.") { 258 bytes = array.size(); 259 break; 260 } 261 array = array.slice(bytes, array.size()); 262 } 263 264 // ------------------------------------------------------------------- 265 266 ArrayOutputStream::ArrayOutputStream(ArrayPtr<byte> array): array(array), fillPos(array.begin()) {} 267 ArrayOutputStream::~ArrayOutputStream() noexcept(false) {} 268 269 ArrayPtr<byte> ArrayOutputStream::getWriteBuffer() { 270 return arrayPtr(fillPos, array.end()); 271 } 272 273 void ArrayOutputStream::write(const void* src, size_t size) { 274 if (src == fillPos && fillPos != array.end()) { 275 // Oh goody, the caller wrote directly into our buffer. 276 KJ_REQUIRE(size <= array.end() - fillPos, size, fillPos, array.end() - fillPos); 277 fillPos += size; 278 } else { 279 KJ_REQUIRE(size <= (size_t)(array.end() - fillPos), 280 "ArrayOutputStream's backing array was not large enough for the data written."); 281 memcpy(fillPos, src, size); 282 fillPos += size; 283 } 284 } 285 286 // ------------------------------------------------------------------- 287 288 VectorOutputStream::VectorOutputStream(size_t initialCapacity) 289 : vector(heapArray<byte>(initialCapacity)), fillPos(vector.begin()) {} 290 VectorOutputStream::~VectorOutputStream() noexcept(false) {} 291 292 ArrayPtr<byte> VectorOutputStream::getWriteBuffer() { 293 // Grow if needed. 294 if (fillPos == vector.end()) { 295 grow(vector.size() + 1); 296 } 297 298 return arrayPtr(fillPos, vector.end()); 299 } 300 301 void VectorOutputStream::write(const void* src, size_t size) { 302 if (src == fillPos && fillPos != vector.end()) { 303 // Oh goody, the caller wrote directly into our buffer. 304 KJ_REQUIRE(size <= vector.end() - fillPos, size, fillPos, vector.end() - fillPos); 305 fillPos += size; 306 } else { 307 if (vector.end() - fillPos < size) { 308 grow(fillPos - vector.begin() + size); 309 } 310 311 memcpy(fillPos, src, size); 312 fillPos += size; 313 } 314 } 315 316 void VectorOutputStream::grow(size_t minSize) { 317 size_t newSize = vector.size() * 2; 318 while (newSize < minSize) newSize *= 2; 319 auto newVector = heapArray<byte>(newSize); 320 memcpy(newVector.begin(), vector.begin(), fillPos - vector.begin()); 321 fillPos = fillPos - vector.begin() + newVector.begin(); 322 vector = kj::mv(newVector); 323 } 324 325 // ======================================================================================= 326 327 AutoCloseFd::~AutoCloseFd() noexcept(false) { 328 if (fd >= 0) { 329 // Don't use SYSCALL() here because close() should not be repeated on EINTR. 330 if (miniposix::close(fd) < 0) { 331 KJ_FAIL_SYSCALL("close", errno, fd) { 332 // This ensures we don't throw an exception if unwinding. 333 break; 334 } 335 } 336 } 337 } 338 339 FdInputStream::~FdInputStream() noexcept(false) {} 340 341 size_t FdInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) { 342 byte* pos = reinterpret_cast<byte*>(buffer); 343 byte* min = pos + minBytes; 344 byte* max = pos + maxBytes; 345 346 while (pos < min) { 347 miniposix::ssize_t n; 348 KJ_SYSCALL(n = miniposix::read(fd, pos, max - pos), fd); 349 if (n == 0) { 350 break; 351 } 352 pos += n; 353 } 354 355 return pos - reinterpret_cast<byte*>(buffer); 356 } 357 358 FdOutputStream::~FdOutputStream() noexcept(false) {} 359 360 void FdOutputStream::write(const void* buffer, size_t size) { 361 const char* pos = reinterpret_cast<const char*>(buffer); 362 363 while (size > 0) { 364 miniposix::ssize_t n; 365 KJ_SYSCALL(n = miniposix::write(fd, pos, size), fd); 366 KJ_ASSERT(n > 0, "write() returned zero."); 367 pos += n; 368 size -= n; 369 } 370 } 371 372 void FdOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) { 373 #if _WIN32 374 // Windows has no reasonable writev(). It has WriteFileGather, but this call has the unreasonable 375 // restriction that each segment must be page-aligned. So, fall back to the default implementation 376 377 OutputStream::write(pieces); 378 379 #else 380 const size_t iovmax = miniposix::iovMax(); 381 while (pieces.size() > iovmax) { 382 write(pieces.slice(0, iovmax)); 383 pieces = pieces.slice(iovmax, pieces.size()); 384 } 385 386 KJ_STACK_ARRAY(struct iovec, iov, pieces.size(), 16, 128); 387 388 for (uint i = 0; i < pieces.size(); i++) { 389 // writev() interface is not const-correct. :( 390 iov[i].iov_base = const_cast<byte*>(pieces[i].begin()); 391 iov[i].iov_len = pieces[i].size(); 392 } 393 394 struct iovec* current = iov.begin(); 395 396 // Advance past any leading empty buffers so that a write full of only empty buffers does not 397 // cause a syscall at all. 398 while (current < iov.end() && current->iov_len == 0) { 399 ++current; 400 } 401 402 while (current < iov.end()) { 403 // Issue the write. 404 ssize_t n = 0; 405 KJ_SYSCALL(n = ::writev(fd, current, iov.end() - current), fd); 406 KJ_ASSERT(n > 0, "writev() returned zero."); 407 408 // Advance past all buffers that were fully-written. 409 while (current < iov.end() && static_cast<size_t>(n) >= current->iov_len) { 410 n -= current->iov_len; 411 ++current; 412 } 413 414 // If we only partially-wrote one of the buffers, adjust the pointer and size to include only 415 // the unwritten part. 416 if (n > 0) { 417 current->iov_base = reinterpret_cast<byte*>(current->iov_base) + n; 418 current->iov_len -= n; 419 } 420 } 421 #endif 422 } 423 424 // ======================================================================================= 425 426 #if _WIN32 427 428 AutoCloseHandle::~AutoCloseHandle() noexcept(false) { 429 if (handle != (void*)-1) { 430 KJ_WIN32(CloseHandle(handle)); 431 } 432 } 433 434 HandleInputStream::~HandleInputStream() noexcept(false) {} 435 436 size_t HandleInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) { 437 byte* pos = reinterpret_cast<byte*>(buffer); 438 byte* min = pos + minBytes; 439 byte* max = pos + maxBytes; 440 441 while (pos < min) { 442 DWORD n; 443 KJ_WIN32(ReadFile(handle, pos, kj::min(max - pos, DWORD(kj::maxValue)), &n, nullptr)); 444 if (n == 0) { 445 break; 446 } 447 pos += n; 448 } 449 450 return pos - reinterpret_cast<byte*>(buffer); 451 } 452 453 HandleOutputStream::~HandleOutputStream() noexcept(false) {} 454 455 void HandleOutputStream::write(const void* buffer, size_t size) { 456 const char* pos = reinterpret_cast<const char*>(buffer); 457 458 while (size > 0) { 459 DWORD n; 460 KJ_WIN32(WriteFile(handle, pos, kj::min(size, DWORD(kj::maxValue)), &n, nullptr)); 461 KJ_ASSERT(n > 0, "write() returned zero."); 462 pos += n; 463 size -= n; 464 } 465 } 466 467 #endif // _WIN32 468 469 } // namespace kj