capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

http-test.c++ (134395B)


      1 // Copyright (c) 2017 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #define KJ_TESTING_KJ 1
     23 
     24 #include "http.h"
     25 #include <kj/debug.h>
     26 #include <kj/test.h>
     27 #include <kj/encoding.h>
     28 #include <map>
     29 
     30 #if KJ_HTTP_TEST_USE_OS_PIPE
     31 // Run the test using OS-level socketpairs. (See http-socketpair-test.c++.)
     32 #define KJ_HTTP_TEST_SETUP_IO \
     33   auto io = kj::setupAsyncIo(); \
     34   auto& waitScope KJ_UNUSED = io.waitScope
     35 #define KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR \
     36   auto listener = io.provider->getNetwork().parseAddress("localhost", 0) \
     37       .wait(waitScope)->listen(); \
     38   auto addr = io.provider->getNetwork().parseAddress("localhost", listener->getPort()) \
     39       .wait(waitScope)
     40 #define KJ_HTTP_TEST_CREATE_2PIPE \
     41   io.provider->newTwoWayPipe()
     42 #else
     43 // Run the test using in-process two-way pipes.
     44 #define KJ_HTTP_TEST_SETUP_IO \
     45   kj::EventLoop eventLoop; \
     46   kj::WaitScope waitScope(eventLoop)
     47 #define KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR \
     48   auto capPipe = newCapabilityPipe(); \
     49   auto listener = kj::heap<kj::CapabilityStreamConnectionReceiver>(*capPipe.ends[0]); \
     50   auto addr = kj::heap<kj::CapabilityStreamNetworkAddress>(nullptr, *capPipe.ends[1])
     51 #define KJ_HTTP_TEST_CREATE_2PIPE \
     52   kj::newTwoWayPipe()
     53 #endif
     54 
     55 namespace kj {
     56 namespace {
     57 
     58 KJ_TEST("HttpMethod parse / stringify") {
     59 #define TRY(name) \
     60   KJ_EXPECT(kj::str(HttpMethod::name) == #name); \
     61   KJ_IF_MAYBE(parsed, tryParseHttpMethod(#name)) { \
     62     KJ_EXPECT(*parsed == HttpMethod::name); \
     63   } else { \
     64     KJ_FAIL_EXPECT("couldn't parse \"" #name "\" as HttpMethod"); \
     65   }
     66 
     67   KJ_HTTP_FOR_EACH_METHOD(TRY)
     68 #undef TRY
     69 
     70   KJ_EXPECT(tryParseHttpMethod("FOO") == nullptr);
     71   KJ_EXPECT(tryParseHttpMethod("") == nullptr);
     72   KJ_EXPECT(tryParseHttpMethod("G") == nullptr);
     73   KJ_EXPECT(tryParseHttpMethod("GE") == nullptr);
     74   KJ_EXPECT(tryParseHttpMethod("GET ") == nullptr);
     75   KJ_EXPECT(tryParseHttpMethod("get") == nullptr);
     76 }
     77 
     78 KJ_TEST("HttpHeaderTable") {
     79   HttpHeaderTable::Builder builder;
     80 
     81   auto host = builder.add("Host");
     82   auto host2 = builder.add("hOsT");
     83   auto fooBar = builder.add("Foo-Bar");
     84   auto bazQux = builder.add("baz-qux");
     85   auto bazQux2 = builder.add("Baz-Qux");
     86 
     87   auto table = builder.build();
     88 
     89   uint builtinHeaderCount = 0;
     90 #define INCREMENT(id, name) ++builtinHeaderCount;
     91   KJ_HTTP_FOR_EACH_BUILTIN_HEADER(INCREMENT)
     92 #undef INCREMENT
     93 
     94   KJ_EXPECT(table->idCount() == builtinHeaderCount + 2);
     95 
     96   KJ_EXPECT(host == HttpHeaderId::HOST);
     97   KJ_EXPECT(host != HttpHeaderId::DATE);
     98   KJ_EXPECT(host2 == host);
     99 
    100   KJ_EXPECT(host != fooBar);
    101   KJ_EXPECT(host != bazQux);
    102   KJ_EXPECT(fooBar != bazQux);
    103   KJ_EXPECT(bazQux == bazQux2);
    104 
    105   KJ_EXPECT(kj::str(host) == "Host");
    106   KJ_EXPECT(kj::str(host2) == "Host");
    107   KJ_EXPECT(kj::str(fooBar) == "Foo-Bar");
    108   KJ_EXPECT(kj::str(bazQux) == "baz-qux");
    109   KJ_EXPECT(kj::str(HttpHeaderId::HOST) == "Host");
    110 
    111   KJ_EXPECT(table->idToString(HttpHeaderId::DATE) == "Date");
    112   KJ_EXPECT(table->idToString(fooBar) == "Foo-Bar");
    113 
    114   KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("Date")) == HttpHeaderId::DATE);
    115   KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("dATE")) == HttpHeaderId::DATE);
    116   KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("Foo-Bar")) == fooBar);
    117   KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("foo-BAR")) == fooBar);
    118   KJ_EXPECT(table->stringToId("foobar") == nullptr);
    119   KJ_EXPECT(table->stringToId("barfoo") == nullptr);
    120 }
    121 
    122 KJ_TEST("HttpHeaders::parseRequest") {
    123   HttpHeaderTable::Builder builder;
    124 
    125   auto fooBar = builder.add("Foo-Bar");
    126   auto bazQux = builder.add("baz-qux");
    127 
    128   auto table = builder.build();
    129 
    130   HttpHeaders headers(*table);
    131   auto text = kj::heapString(
    132       "POST   /some/path \t   HTTP/1.1\r\n"
    133       "Foo-BaR: Baz\r\n"
    134       "Host: example.com\r\n"
    135       "Content-Length: 123\r\n"
    136       "DATE:     early\r\n"
    137       "other-Header: yep\r\n"
    138       "with.dots: sure\r\n"
    139       "\r\n");
    140   auto result = headers.tryParseRequest(text.asArray()).get<HttpHeaders::Request>();
    141 
    142   KJ_EXPECT(result.method == HttpMethod::POST);
    143   KJ_EXPECT(result.url == "/some/path");
    144   KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::HOST)) == "example.com");
    145   KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::DATE)) == "early");
    146   KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(fooBar)) == "Baz");
    147   KJ_EXPECT(headers.get(bazQux) == nullptr);
    148   KJ_EXPECT(headers.get(HttpHeaderId::CONTENT_TYPE) == nullptr);
    149   KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::CONTENT_LENGTH)) == "123");
    150   KJ_EXPECT(headers.get(HttpHeaderId::TRANSFER_ENCODING) == nullptr);
    151 
    152   std::map<kj::StringPtr, kj::StringPtr> unpackedHeaders;
    153   headers.forEach([&](kj::StringPtr name, kj::StringPtr value) {
    154     KJ_EXPECT(unpackedHeaders.insert(std::make_pair(name, value)).second);
    155   });
    156   KJ_EXPECT(unpackedHeaders.size() == 6);
    157   KJ_EXPECT(unpackedHeaders["Content-Length"] == "123");
    158   KJ_EXPECT(unpackedHeaders["Host"] == "example.com");
    159   KJ_EXPECT(unpackedHeaders["Date"] == "early");
    160   KJ_EXPECT(unpackedHeaders["Foo-Bar"] == "Baz");
    161   KJ_EXPECT(unpackedHeaders["other-Header"] == "yep");
    162   KJ_EXPECT(unpackedHeaders["with.dots"] == "sure");
    163 
    164   KJ_EXPECT(headers.serializeRequest(result.method, result.url) ==
    165       "POST /some/path HTTP/1.1\r\n"
    166       "Content-Length: 123\r\n"
    167       "Host: example.com\r\n"
    168       "Date: early\r\n"
    169       "Foo-Bar: Baz\r\n"
    170       "other-Header: yep\r\n"
    171       "with.dots: sure\r\n"
    172       "\r\n");
    173 }
    174 
    175 KJ_TEST("HttpHeaders::parseResponse") {
    176   HttpHeaderTable::Builder builder;
    177 
    178   auto fooBar = builder.add("Foo-Bar");
    179   auto bazQux = builder.add("baz-qux");
    180 
    181   auto table = builder.build();
    182 
    183   HttpHeaders headers(*table);
    184   auto text = kj::heapString(
    185       "HTTP/1.1\t\t  418\t    I'm a teapot\r\n"
    186       "Foo-BaR: Baz\r\n"
    187       "Host: example.com\r\n"
    188       "Content-Length: 123\r\n"
    189       "DATE:     early\r\n"
    190       "other-Header: yep\r\n"
    191       "\r\n");
    192   auto result = headers.tryParseResponse(text.asArray()).get<HttpHeaders::Response>();
    193 
    194   KJ_EXPECT(result.statusCode == 418);
    195   KJ_EXPECT(result.statusText == "I'm a teapot");
    196   KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::HOST)) == "example.com");
    197   KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::DATE)) == "early");
    198   KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(fooBar)) == "Baz");
    199   KJ_EXPECT(headers.get(bazQux) == nullptr);
    200   KJ_EXPECT(headers.get(HttpHeaderId::CONTENT_TYPE) == nullptr);
    201   KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::CONTENT_LENGTH)) == "123");
    202   KJ_EXPECT(headers.get(HttpHeaderId::TRANSFER_ENCODING) == nullptr);
    203 
    204   std::map<kj::StringPtr, kj::StringPtr> unpackedHeaders;
    205   headers.forEach([&](kj::StringPtr name, kj::StringPtr value) {
    206     KJ_EXPECT(unpackedHeaders.insert(std::make_pair(name, value)).second);
    207   });
    208   KJ_EXPECT(unpackedHeaders.size() == 5);
    209   KJ_EXPECT(unpackedHeaders["Content-Length"] == "123");
    210   KJ_EXPECT(unpackedHeaders["Host"] == "example.com");
    211   KJ_EXPECT(unpackedHeaders["Date"] == "early");
    212   KJ_EXPECT(unpackedHeaders["Foo-Bar"] == "Baz");
    213   KJ_EXPECT(unpackedHeaders["other-Header"] == "yep");
    214 
    215   KJ_EXPECT(headers.serializeResponse(
    216         result.statusCode, result.statusText) ==
    217       "HTTP/1.1 418 I'm a teapot\r\n"
    218       "Content-Length: 123\r\n"
    219       "Host: example.com\r\n"
    220       "Date: early\r\n"
    221       "Foo-Bar: Baz\r\n"
    222       "other-Header: yep\r\n"
    223       "\r\n");
    224 }
    225 
    226 KJ_TEST("HttpHeaders parse invalid") {
    227   auto table = HttpHeaderTable::Builder().build();
    228   HttpHeaders headers(*table);
    229 
    230   // NUL byte in request.
    231   {
    232     auto input = kj::heapString(
    233         "POST  \0 /some/path \t   HTTP/1.1\r\n"
    234         "Foo-BaR: Baz\r\n"
    235         "Host: example.com\r\n"
    236         "DATE:     early\r\n"
    237         "other-Header: yep\r\n"
    238         "\r\n");
    239 
    240     auto protocolError = headers.tryParseRequest(input).get<HttpHeaders::ProtocolError>();
    241 
    242     KJ_EXPECT(protocolError.description == "Request headers have no terminal newline.",
    243         protocolError.description);
    244     KJ_EXPECT(protocolError.rawContent.asChars() == input);
    245   }
    246 
    247   // Control character in header name.
    248   {
    249     auto input = kj::heapString(
    250         "POST   /some/path \t   HTTP/1.1\r\n"
    251         "Foo-BaR: Baz\r\n"
    252         "Cont\001ent-Length: 123\r\n"
    253         "DATE:     early\r\n"
    254         "other-Header: yep\r\n"
    255         "\r\n");
    256 
    257     auto protocolError = headers.tryParseRequest(input).get<HttpHeaders::ProtocolError>();
    258 
    259     KJ_EXPECT(protocolError.description == "The headers sent by your client are not valid.",
    260         protocolError.description);
    261     KJ_EXPECT(protocolError.rawContent.asChars() == input);
    262   }
    263 
    264   // Separator character in header name.
    265   {
    266      auto input = kj::heapString(
    267         "POST   /some/path \t   HTTP/1.1\r\n"
    268         "Foo-BaR: Baz\r\n"
    269         "Host: example.com\r\n"
    270         "DATE/:     early\r\n"
    271         "other-Header: yep\r\n"
    272         "\r\n");
    273 
    274     auto protocolError = headers.tryParseRequest(input).get<HttpHeaders::ProtocolError>();
    275 
    276     KJ_EXPECT(protocolError.description == "The headers sent by your client are not valid.",
    277         protocolError.description);
    278     KJ_EXPECT(protocolError.rawContent.asChars() == input);
    279   }
    280 
    281   // Response status code not numeric.
    282   {
    283      auto input = kj::heapString(
    284       "HTTP/1.1\t\t  abc\t    I'm a teapot\r\n"
    285       "Foo-BaR: Baz\r\n"
    286       "Host: example.com\r\n"
    287       "DATE:     early\r\n"
    288       "other-Header: yep\r\n"
    289       "\r\n");
    290 
    291     auto protocolError = headers.tryParseRequest(input).get<HttpHeaders::ProtocolError>();
    292 
    293     KJ_EXPECT(protocolError.description == "Unrecognized request method.",
    294         protocolError.description);
    295     KJ_EXPECT(protocolError.rawContent.asChars() == input);
    296   }
    297 }
    298 
    299 KJ_TEST("HttpHeaders validation") {
    300   auto table = HttpHeaderTable::Builder().build();
    301   HttpHeaders headers(*table);
    302 
    303   headers.add("Valid-Name", "valid value");
    304 
    305   // The HTTP RFC prohibits control characters, but browsers only prohibit \0, \r, and \n. KJ goes
    306   // with the browsers for compatibility.
    307   headers.add("Valid-Name", "valid\x01value");
    308 
    309   // The HTTP RFC does not permit non-ASCII values.
    310   // KJ chooses to interpret them as UTF-8, to avoid the need for any expensive conversion.
    311   // Browsers apparently interpret them as LATIN-1. Applications can reinterpet these strings as
    312   // LATIN-1 easily enough if they really need to.
    313   headers.add("Valid-Name", u8"valid€value");
    314 
    315   KJ_EXPECT_THROW_MESSAGE("invalid header name", headers.add("Invalid Name", "value"));
    316   KJ_EXPECT_THROW_MESSAGE("invalid header name", headers.add("Invalid@Name", "value"));
    317 
    318   KJ_EXPECT_THROW_MESSAGE("invalid header value", headers.set(HttpHeaderId::HOST, "in\nvalid"));
    319   KJ_EXPECT_THROW_MESSAGE("invalid header value", headers.add("Valid-Name", "in\nvalid"));
    320 }
    321 
    322 KJ_TEST("HttpHeaders Set-Cookie handling") {
    323   HttpHeaderTable::Builder builder;
    324   auto hCookie = builder.add("Cookie");
    325   auto hSetCookie = builder.add("Set-Cookie");
    326   auto table = builder.build();
    327 
    328   HttpHeaders headers(*table);
    329   headers.set(hCookie, "Foo");
    330   headers.add("Cookie", "Bar");
    331   headers.add("Cookie", "Baz");
    332   headers.set(hSetCookie, "Foo");
    333   headers.add("Set-Cookie", "Bar");
    334   headers.add("Set-Cookie", "Baz");
    335 
    336   auto text = headers.toString();
    337   KJ_EXPECT(text ==
    338       "Cookie: Foo, Bar, Baz\r\n"
    339       "Set-Cookie: Foo\r\n"
    340       "Set-Cookie: Bar\r\n"
    341       "Set-Cookie: Baz\r\n"
    342       "\r\n", text);
    343 }
    344 
    345 // =======================================================================================
    346 
    347 class ReadFragmenter final: public kj::AsyncIoStream {
    348 public:
    349   ReadFragmenter(AsyncIoStream& inner, size_t limit): inner(inner), limit(limit) {}
    350 
    351   Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
    352     return inner.read(buffer, minBytes, kj::max(minBytes, kj::min(limit, maxBytes)));
    353   }
    354   Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
    355     return inner.tryRead(buffer, minBytes, kj::max(minBytes, kj::min(limit, maxBytes)));
    356   }
    357 
    358   Maybe<uint64_t> tryGetLength() override { return inner.tryGetLength(); }
    359 
    360   Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
    361     return inner.pumpTo(output, amount);
    362   }
    363 
    364   Promise<void> write(const void* buffer, size_t size) override {
    365     return inner.write(buffer, size);
    366   }
    367   Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
    368     return inner.write(pieces);
    369   }
    370 
    371   Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
    372     return inner.tryPumpFrom(input, amount);
    373   }
    374 
    375   Promise<void> whenWriteDisconnected() override {
    376     return inner.whenWriteDisconnected();
    377   }
    378 
    379   void shutdownWrite() override {
    380     return inner.shutdownWrite();
    381   }
    382 
    383   void abortRead() override { return inner.abortRead(); }
    384 
    385   void getsockopt(int level, int option, void* value, uint* length) override {
    386     return inner.getsockopt(level, option, value, length);
    387   }
    388   void setsockopt(int level, int option, const void* value, uint length) override {
    389     return inner.setsockopt(level, option, value, length);
    390   }
    391 
    392   void getsockname(struct sockaddr* addr, uint* length) override {
    393     return inner.getsockname(addr, length);
    394   }
    395   void getpeername(struct sockaddr* addr, uint* length) override {
    396     return inner.getsockname(addr, length);
    397   }
    398 
    399 private:
    400   kj::AsyncIoStream& inner;
    401   size_t limit;
    402 };
    403 
    404 template <typename T>
    405 class InitializeableArray: public Array<T> {
    406 public:
    407   InitializeableArray(std::initializer_list<T> init)
    408       : Array<T>(kj::heapArray(init)) {}
    409 };
    410 
    411 enum Side { BOTH, CLIENT_ONLY, SERVER_ONLY };
    412 
    413 struct HeaderTestCase {
    414   HttpHeaderId id;
    415   kj::StringPtr value;
    416 };
    417 
    418 struct HttpRequestTestCase {
    419   kj::StringPtr raw;
    420 
    421   HttpMethod method;
    422   kj::StringPtr path;
    423   InitializeableArray<HeaderTestCase> requestHeaders;
    424   kj::Maybe<uint64_t> requestBodySize;
    425   InitializeableArray<kj::StringPtr> requestBodyParts;
    426 
    427   Side side = BOTH;
    428 
    429   // TODO(cleanup): Delete this constructor if/when we move to C++14.
    430   HttpRequestTestCase(kj::StringPtr raw, HttpMethod method, kj::StringPtr path,
    431                       InitializeableArray<HeaderTestCase> requestHeaders,
    432                       kj::Maybe<uint64_t> requestBodySize,
    433                       InitializeableArray<kj::StringPtr> requestBodyParts,
    434                       Side side = BOTH)
    435       : raw(raw), method(method), path(path), requestHeaders(kj::mv(requestHeaders)),
    436         requestBodySize(requestBodySize), requestBodyParts(kj::mv(requestBodyParts)),
    437         side(side) {}
    438 };
    439 
    440 struct HttpResponseTestCase {
    441   kj::StringPtr raw;
    442 
    443   uint64_t statusCode;
    444   kj::StringPtr statusText;
    445   InitializeableArray<HeaderTestCase> responseHeaders;
    446   kj::Maybe<uint64_t> responseBodySize;
    447   InitializeableArray<kj::StringPtr> responseBodyParts;
    448 
    449   HttpMethod method = HttpMethod::GET;
    450 
    451   Side side = BOTH;
    452 
    453   // TODO(cleanup): Delete this constructor if/when we move to C++14.
    454   HttpResponseTestCase(kj::StringPtr raw, uint64_t statusCode, kj::StringPtr statusText,
    455                        InitializeableArray<HeaderTestCase> responseHeaders,
    456                        kj::Maybe<uint64_t> responseBodySize,
    457                        InitializeableArray<kj::StringPtr> responseBodyParts,
    458                        HttpMethod method = HttpMethod::GET,
    459                        Side side = BOTH)
    460       : raw(raw), statusCode(statusCode), statusText(statusText),
    461         responseHeaders(kj::mv(responseHeaders)), responseBodySize(responseBodySize),
    462         responseBodyParts(kj::mv(responseBodyParts)), method(method), side(side) {}
    463 };
    464 
    465 struct HttpTestCase {
    466   HttpRequestTestCase request;
    467   HttpResponseTestCase response;
    468 };
    469 
    470 kj::Promise<void> writeEach(kj::AsyncOutputStream& out, kj::ArrayPtr<const kj::StringPtr> parts) {
    471   if (parts.size() == 0) return kj::READY_NOW;
    472 
    473   return out.write(parts[0].begin(), parts[0].size())
    474       .then([&out,parts]() {
    475     return writeEach(out, parts.slice(1, parts.size()));
    476   });
    477 }
    478 
    479 kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
    480   if (expected.size() == 0) return kj::READY_NOW;
    481 
    482   auto buffer = kj::heapArray<char>(expected.size());
    483 
    484   auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
    485   return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) {
    486     if (amount == 0) {
    487       KJ_FAIL_ASSERT("expected data never sent", expected);
    488     }
    489 
    490     auto actual = buffer.slice(0, amount);
    491     if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
    492       KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
    493     }
    494 
    495     return expectRead(in, expected.slice(amount));
    496   }));
    497 }
    498 
    499 kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::ArrayPtr<const byte> expected) {
    500   if (expected.size() == 0) return kj::READY_NOW;
    501 
    502   auto buffer = kj::heapArray<byte>(expected.size());
    503 
    504   auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
    505   return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<byte> buffer, size_t amount) {
    506     if (amount == 0) {
    507       KJ_FAIL_ASSERT("expected data never sent", expected);
    508     }
    509 
    510     auto actual = buffer.slice(0, amount);
    511     if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
    512       KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
    513     }
    514 
    515     return expectRead(in, expected.slice(amount, expected.size()));
    516   }));
    517 }
    518 
    519 kj::Promise<void> expectEnd(kj::AsyncInputStream& in) {
    520   static char buffer;
    521 
    522   auto promise = in.tryRead(&buffer, 1, 1);
    523   return promise.then([](size_t amount) {
    524     KJ_ASSERT(amount == 0, "expected EOF");
    525   });
    526 }
    527 
    528 void testHttpClientRequest(kj::WaitScope& waitScope, const HttpRequestTestCase& testCase,
    529                            kj::TwoWayPipe pipe) {
    530 
    531   auto serverTask = expectRead(*pipe.ends[1], testCase.raw).then([&]() {
    532     static const char SIMPLE_RESPONSE[] =
    533         "HTTP/1.1 200 OK\r\n"
    534         "Content-Length: 0\r\n"
    535         "\r\n";
    536     return pipe.ends[1]->write(SIMPLE_RESPONSE, strlen(SIMPLE_RESPONSE));
    537   }).then([&]() -> kj::Promise<void> {
    538     return kj::NEVER_DONE;
    539   });
    540 
    541   HttpHeaderTable table;
    542   auto client = newHttpClient(table, *pipe.ends[0]);
    543 
    544   HttpHeaders headers(table);
    545   for (auto& header: testCase.requestHeaders) {
    546     headers.set(header.id, header.value);
    547   }
    548 
    549   auto request = client->request(testCase.method, testCase.path, headers, testCase.requestBodySize);
    550   if (testCase.requestBodyParts.size() > 0) {
    551     writeEach(*request.body, testCase.requestBodyParts).wait(waitScope);
    552   }
    553   request.body = nullptr;
    554   auto clientTask = request.response
    555       .then([&](HttpClient::Response&& response) {
    556     auto promise = response.body->readAllText();
    557     return promise.attach(kj::mv(response.body));
    558   }).ignoreResult();
    559 
    560   serverTask.exclusiveJoin(kj::mv(clientTask)).wait(waitScope);
    561 
    562   // Verify no more data written by client.
    563   client = nullptr;
    564   pipe.ends[0]->shutdownWrite();
    565   KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
    566 }
    567 
    568 void testHttpClientResponse(kj::WaitScope& waitScope, const HttpResponseTestCase& testCase,
    569                             size_t readFragmentSize, kj::TwoWayPipe pipe) {
    570   ReadFragmenter fragmenter(*pipe.ends[0], readFragmentSize);
    571 
    572   auto expectedReqText = testCase.method == HttpMethod::GET || testCase.method == HttpMethod::HEAD
    573       ? kj::str(testCase.method, " / HTTP/1.1\r\n\r\n")
    574       : kj::str(testCase.method, " / HTTP/1.1\r\nContent-Length: 0\r\n");
    575 
    576   auto serverTask = expectRead(*pipe.ends[1], expectedReqText).then([&]() {
    577     return pipe.ends[1]->write(testCase.raw.begin(), testCase.raw.size());
    578   }).then([&]() -> kj::Promise<void> {
    579     pipe.ends[1]->shutdownWrite();
    580     return kj::NEVER_DONE;
    581   });
    582 
    583   HttpHeaderTable table;
    584   auto client = newHttpClient(table, fragmenter);
    585 
    586   HttpHeaders headers(table);
    587   auto request = client->request(testCase.method, "/", headers, uint64_t(0));
    588   request.body = nullptr;
    589   auto clientTask = request.response
    590       .then([&](HttpClient::Response&& response) {
    591     KJ_EXPECT(response.statusCode == testCase.statusCode);
    592     KJ_EXPECT(response.statusText == testCase.statusText);
    593 
    594     for (auto& header: testCase.responseHeaders) {
    595       KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(header.id)) == header.value);
    596     }
    597     auto promise = response.body->readAllText();
    598     return promise.attach(kj::mv(response.body));
    599   }).then([&](kj::String body) {
    600     KJ_EXPECT(body == kj::strArray(testCase.responseBodyParts, ""), body);
    601   });
    602 
    603   serverTask.exclusiveJoin(kj::mv(clientTask)).wait(waitScope);
    604 
    605   // Verify no more data written by client.
    606   client = nullptr;
    607   pipe.ends[0]->shutdownWrite();
    608   KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
    609 }
    610 
    611 void testHttpClient(kj::WaitScope& waitScope, HttpHeaderTable& table,
    612                     HttpClient& client, const HttpTestCase& testCase) {
    613   KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
    614 
    615   HttpHeaders headers(table);
    616   for (auto& header: testCase.request.requestHeaders) {
    617     headers.set(header.id, header.value);
    618   }
    619 
    620   auto request = client.request(
    621       testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
    622   for (auto& part: testCase.request.requestBodyParts) {
    623     request.body->write(part.begin(), part.size()).wait(waitScope);
    624   }
    625   request.body = nullptr;
    626 
    627   auto response = request.response.wait(waitScope);
    628 
    629   KJ_EXPECT(response.statusCode == testCase.response.statusCode);
    630   auto body = response.body->readAllText().wait(waitScope);
    631   if (testCase.request.method == HttpMethod::HEAD) {
    632     KJ_EXPECT(body == "");
    633   } else {
    634     KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
    635   }
    636 }
    637 
    638 class TestHttpService final: public HttpService {
    639 public:
    640   TestHttpService(const HttpRequestTestCase& expectedRequest,
    641                   const HttpResponseTestCase& response,
    642                   HttpHeaderTable& table)
    643       : singleExpectedRequest(&expectedRequest),
    644         singleResponse(&response),
    645         responseHeaders(table) {}
    646   TestHttpService(kj::ArrayPtr<const HttpTestCase> testCases,
    647                   HttpHeaderTable& table)
    648       : singleExpectedRequest(nullptr),
    649         singleResponse(nullptr),
    650         testCases(testCases),
    651         responseHeaders(table) {}
    652 
    653   uint getRequestCount() { return requestCount; }
    654 
    655   kj::Promise<void> request(
    656       HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
    657       kj::AsyncInputStream& requestBody, Response& responseSender) override {
    658     auto& expectedRequest = testCases == nullptr ? *singleExpectedRequest :
    659         testCases[requestCount % testCases.size()].request;
    660     auto& response = testCases == nullptr ? *singleResponse :
    661         testCases[requestCount % testCases.size()].response;
    662 
    663     ++requestCount;
    664 
    665     KJ_EXPECT(method == expectedRequest.method, method);
    666     KJ_EXPECT(url == expectedRequest.path, url);
    667 
    668     for (auto& header: expectedRequest.requestHeaders) {
    669       KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(header.id)) == header.value);
    670     }
    671 
    672     auto size = requestBody.tryGetLength();
    673     KJ_IF_MAYBE(expectedSize, expectedRequest.requestBodySize) {
    674       KJ_IF_MAYBE(s, size) {
    675         KJ_EXPECT(*s == *expectedSize, *s);
    676       } else {
    677         KJ_FAIL_EXPECT("tryGetLength() returned nullptr; expected known size");
    678       }
    679     } else {
    680       KJ_EXPECT(size == nullptr);
    681     }
    682 
    683     return requestBody.readAllText()
    684         .then([this,&expectedRequest,&response,&responseSender](kj::String text) {
    685       KJ_EXPECT(text == kj::strArray(expectedRequest.requestBodyParts, ""), text);
    686 
    687       responseHeaders.clear();
    688       for (auto& header: response.responseHeaders) {
    689         responseHeaders.set(header.id, header.value);
    690       }
    691 
    692       auto stream = responseSender.send(response.statusCode, response.statusText,
    693                                         responseHeaders, response.responseBodySize);
    694       auto promise = writeEach(*stream, response.responseBodyParts);
    695       return promise.attach(kj::mv(stream));
    696     });
    697   }
    698 
    699 private:
    700   const HttpRequestTestCase* singleExpectedRequest;
    701   const HttpResponseTestCase* singleResponse;
    702   kj::ArrayPtr<const HttpTestCase> testCases;
    703   HttpHeaders responseHeaders;
    704   uint requestCount = 0;
    705 };
    706 
    707 void testHttpServerRequest(kj::WaitScope& waitScope, kj::Timer& timer,
    708                            const HttpRequestTestCase& requestCase,
    709                            const HttpResponseTestCase& responseCase,
    710                            kj::TwoWayPipe pipe) {
    711   HttpHeaderTable table;
    712   TestHttpService service(requestCase, responseCase, table);
    713   HttpServer server(timer, table, service);
    714 
    715   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
    716 
    717   pipe.ends[1]->write(requestCase.raw.begin(), requestCase.raw.size()).wait(waitScope);
    718   pipe.ends[1]->shutdownWrite();
    719 
    720   expectRead(*pipe.ends[1], responseCase.raw).wait(waitScope);
    721 
    722   listenTask.wait(waitScope);
    723 
    724   KJ_EXPECT(service.getRequestCount() == 1);
    725 }
    726 
    727 kj::ArrayPtr<const HttpRequestTestCase> requestTestCases() {
    728   static const auto HUGE_STRING = kj::strArray(kj::repeat("abcdefgh", 4096), "");
    729   static const auto HUGE_REQUEST = kj::str(
    730       "GET / HTTP/1.1\r\n"
    731       "Host: ", HUGE_STRING, "\r\n"
    732       "\r\n");
    733 
    734   static const HttpRequestTestCase REQUEST_TEST_CASES[] {
    735     {
    736       "GET /foo/bar HTTP/1.1\r\n"
    737       "Host: example.com\r\n"
    738       "\r\n",
    739 
    740       HttpMethod::GET,
    741       "/foo/bar",
    742       {{HttpHeaderId::HOST, "example.com"}},
    743       uint64_t(0), {},
    744     },
    745 
    746     {
    747       "HEAD /foo/bar HTTP/1.1\r\n"
    748       "Host: example.com\r\n"
    749       "\r\n",
    750 
    751       HttpMethod::HEAD,
    752       "/foo/bar",
    753       {{HttpHeaderId::HOST, "example.com"}},
    754       uint64_t(0), {},
    755     },
    756 
    757     {
    758       "POST / HTTP/1.1\r\n"
    759       "Content-Length: 9\r\n"
    760       "Host: example.com\r\n"
    761       "Content-Type: text/plain\r\n"
    762       "\r\n"
    763       "foobarbaz",
    764 
    765       HttpMethod::POST,
    766       "/",
    767       {
    768         {HttpHeaderId::HOST, "example.com"},
    769         {HttpHeaderId::CONTENT_TYPE, "text/plain"},
    770       },
    771       9, { "foo", "bar", "baz" },
    772     },
    773 
    774     {
    775       "POST / HTTP/1.1\r\n"
    776       "Transfer-Encoding: chunked\r\n"
    777       "Host: example.com\r\n"
    778       "Content-Type: text/plain\r\n"
    779       "\r\n"
    780       "3\r\n"
    781       "foo\r\n"
    782       "6\r\n"
    783       "barbaz\r\n"
    784       "0\r\n"
    785       "\r\n",
    786 
    787       HttpMethod::POST,
    788       "/",
    789       {
    790         {HttpHeaderId::HOST, "example.com"},
    791         {HttpHeaderId::CONTENT_TYPE, "text/plain"},
    792       },
    793       nullptr, { "foo", "barbaz" },
    794     },
    795 
    796     {
    797       "POST / HTTP/1.1\r\n"
    798       "Transfer-Encoding: chunked\r\n"
    799       "Host: example.com\r\n"
    800       "Content-Type: text/plain\r\n"
    801       "\r\n"
    802       "1d\r\n"
    803       "0123456789abcdef0123456789abc\r\n"
    804       "0\r\n"
    805       "\r\n",
    806 
    807       HttpMethod::POST,
    808       "/",
    809       {
    810         {HttpHeaderId::HOST, "example.com"},
    811         {HttpHeaderId::CONTENT_TYPE, "text/plain"},
    812       },
    813       nullptr, { "0123456789abcdef0123456789abc" },
    814     },
    815 
    816     {
    817       HUGE_REQUEST,
    818 
    819       HttpMethod::GET,
    820       "/",
    821       {{HttpHeaderId::HOST, HUGE_STRING}},
    822       uint64_t(0), {}
    823     },
    824 
    825     {
    826       "GET /foo/bar HTTP/1.1\r\n"
    827       "Content-Length: 6\r\n"
    828       "Host: example.com\r\n"
    829       "\r\n"
    830       "foobar",
    831 
    832       HttpMethod::GET,
    833       "/foo/bar",
    834       {{HttpHeaderId::HOST, "example.com"}},
    835       uint64_t(6), { "foobar" },
    836     },
    837 
    838     {
    839       "GET /foo/bar HTTP/1.1\r\n"
    840       "Transfer-Encoding: chunked\r\n"
    841       "Host: example.com\r\n"
    842       "\r\n"
    843       "3\r\n"
    844       "foo\r\n"
    845       "3\r\n"
    846       "bar\r\n"
    847       "0\r\n"
    848       "\r\n",
    849 
    850       HttpMethod::GET,
    851       "/foo/bar",
    852       {{HttpHeaderId::HOST, "example.com"},
    853        {HttpHeaderId::TRANSFER_ENCODING, "chunked"}},
    854       nullptr, { "foo", "bar" },
    855     }
    856   };
    857 
    858   // TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents REQUEST_TEST_CASES from implicitly
    859   //   casting to our return type.
    860   return kj::arrayPtr(REQUEST_TEST_CASES, kj::size(REQUEST_TEST_CASES));
    861 }
    862 
    863 kj::ArrayPtr<const HttpResponseTestCase> responseTestCases() {
    864   static const HttpResponseTestCase RESPONSE_TEST_CASES[] {
    865     {
    866       "HTTP/1.1 200 OK\r\n"
    867       "Content-Type: text/plain\r\n"
    868       "Connection: close\r\n"
    869       "\r\n"
    870       "baz qux",
    871 
    872       200, "OK",
    873       {{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
    874       nullptr, {"baz qux"},
    875 
    876       HttpMethod::GET,
    877       CLIENT_ONLY,   // Server never sends connection: close
    878     },
    879 
    880     {
    881       "HTTP/1.1 200 OK\r\n"
    882       "Content-Type: text/plain\r\n"
    883       "Transfer-Encoding: identity\r\n"
    884       "Content-Length: foobar\r\n"  // intentionally wrong
    885       "\r\n"
    886       "baz qux",
    887 
    888       200, "OK",
    889       {{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
    890       nullptr, {"baz qux"},
    891 
    892       HttpMethod::GET,
    893       CLIENT_ONLY,   // Server never sends transfer-encoding: identity
    894     },
    895 
    896     {
    897       "HTTP/1.1 200 OK\r\n"
    898       "Content-Type: text/plain\r\n"
    899       "\r\n"
    900       "baz qux",
    901 
    902       200, "OK",
    903       {{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
    904       nullptr, {"baz qux"},
    905 
    906       HttpMethod::GET,
    907       CLIENT_ONLY,   // Server never sends non-delimited message
    908     },
    909 
    910     {
    911       "HTTP/1.1 200 OK\r\n"
    912       "Content-Length: 123\r\n"
    913       "Content-Type: text/plain\r\n"
    914       "\r\n",
    915 
    916       200, "OK",
    917       {{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
    918       123, {},
    919 
    920       HttpMethod::HEAD,
    921     },
    922 
    923     {
    924       "HTTP/1.1 200 OK\r\n"
    925       "Content-Length: foobar\r\n"
    926       "Content-Type: text/plain\r\n"
    927       "\r\n",
    928 
    929       200, "OK",
    930       {{HttpHeaderId::CONTENT_TYPE, "text/plain"},
    931        {HttpHeaderId::CONTENT_LENGTH, "foobar"}},
    932       123, {},
    933 
    934       HttpMethod::HEAD,
    935     },
    936 
    937     // Zero-length expected size response to HEAD request has no Content-Length header.
    938     {
    939       "HTTP/1.1 200 OK\r\n"
    940       "\r\n",
    941 
    942       200, "OK",
    943       {},
    944       uint64_t(0), {},
    945 
    946       HttpMethod::HEAD,
    947     },
    948 
    949     {
    950       "HTTP/1.1 204 No Content\r\n"
    951       "\r\n",
    952 
    953       204, "No Content",
    954       {},
    955       uint64_t(0), {},
    956     },
    957 
    958     {
    959       "HTTP/1.1 205 Reset Content\r\n"
    960       "Content-Length: 0\r\n"
    961       "\r\n",
    962 
    963       205, "Reset Content",
    964       {},
    965       uint64_t(0), {},
    966     },
    967 
    968     {
    969       "HTTP/1.1 304 Not Modified\r\n"
    970       "\r\n",
    971 
    972       304, "Not Modified",
    973       {},
    974       uint64_t(0), {},
    975     },
    976 
    977     {
    978       "HTTP/1.1 200 OK\r\n"
    979       "Content-Length: 8\r\n"
    980       "Content-Type: text/plain\r\n"
    981       "\r\n"
    982       "quxcorge",
    983 
    984       200, "OK",
    985       {{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
    986       8, { "qux", "corge" }
    987     },
    988 
    989     {
    990       "HTTP/1.1 200 OK\r\n"
    991       "Transfer-Encoding: chunked\r\n"
    992       "Content-Type: text/plain\r\n"
    993       "\r\n"
    994       "3\r\n"
    995       "qux\r\n"
    996       "5\r\n"
    997       "corge\r\n"
    998       "0\r\n"
    999       "\r\n",
   1000 
   1001       200, "OK",
   1002       {{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
   1003       nullptr, { "qux", "corge" }
   1004     },
   1005   };
   1006 
   1007   // TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents RESPONSE_TEST_CASES from implicitly
   1008   //   casting to our return type.
   1009   return kj::arrayPtr(RESPONSE_TEST_CASES, kj::size(RESPONSE_TEST_CASES));
   1010 }
   1011 
   1012 KJ_TEST("HttpClient requests") {
   1013   KJ_HTTP_TEST_SETUP_IO;
   1014 
   1015   for (auto& testCase: requestTestCases()) {
   1016     if (testCase.side == SERVER_ONLY) continue;
   1017     KJ_CONTEXT(testCase.raw);
   1018     testHttpClientRequest(waitScope, testCase, KJ_HTTP_TEST_CREATE_2PIPE);
   1019   }
   1020 }
   1021 
   1022 KJ_TEST("HttpClient responses") {
   1023   KJ_HTTP_TEST_SETUP_IO;
   1024   size_t FRAGMENT_SIZES[] = { 1, 2, 3, 4, 5, 6, 7, 8, 16, 31, kj::maxValue };
   1025 
   1026   for (auto& testCase: responseTestCases()) {
   1027     if (testCase.side == SERVER_ONLY) continue;
   1028     for (size_t fragmentSize: FRAGMENT_SIZES) {
   1029       KJ_CONTEXT(testCase.raw, fragmentSize);
   1030       testHttpClientResponse(waitScope, testCase, fragmentSize, KJ_HTTP_TEST_CREATE_2PIPE);
   1031     }
   1032   }
   1033 }
   1034 
   1035 KJ_TEST("HttpClient canceled write") {
   1036   KJ_HTTP_TEST_SETUP_IO;
   1037 
   1038   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1039 
   1040   auto serverPromise = pipe.ends[1]->readAllText();
   1041 
   1042   {
   1043     HttpHeaderTable table;
   1044     auto client = newHttpClient(table, *pipe.ends[0]);
   1045 
   1046     auto body = kj::heapArray<byte>(4096);
   1047     memset(body.begin(), 0xcf, body.size());
   1048 
   1049     auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table), uint64_t(4096));
   1050 
   1051     // Note: This poll() forces the server to receive what was written so far. Otherwise,
   1052     //   cancelling the write below may in fact cancel the previous write as well.
   1053     KJ_EXPECT(!serverPromise.poll(waitScope));
   1054 
   1055     // Start a write and immediately cancel it.
   1056     {
   1057       auto ignore KJ_UNUSED = req.body->write(body.begin(), body.size());
   1058     }
   1059 
   1060     KJ_EXPECT_THROW_MESSAGE("overwrote", req.body->write("foo", 3).wait(waitScope));
   1061     req.body = nullptr;
   1062 
   1063     KJ_EXPECT(!serverPromise.poll(waitScope));
   1064 
   1065     KJ_EXPECT_THROW_MESSAGE("can't start new request until previous request body",
   1066         client->request(HttpMethod::GET, "/", HttpHeaders(table)).response.wait(waitScope));
   1067   }
   1068 
   1069   pipe.ends[0]->shutdownWrite();
   1070   auto text = serverPromise.wait(waitScope);
   1071   KJ_EXPECT(text == "POST / HTTP/1.1\r\nContent-Length: 4096\r\n\r\n", text);
   1072 }
   1073 
   1074 KJ_TEST("HttpClient chunked body gather-write") {
   1075   KJ_HTTP_TEST_SETUP_IO;
   1076 
   1077   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1078 
   1079   auto serverPromise = pipe.ends[1]->readAllText();
   1080 
   1081   {
   1082     HttpHeaderTable table;
   1083     auto client = newHttpClient(table, *pipe.ends[0]);
   1084 
   1085     auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table));
   1086 
   1087     kj::ArrayPtr<const byte> bodyParts[] = {
   1088       "foo"_kj.asBytes(), " "_kj.asBytes(),
   1089       "bar"_kj.asBytes(), " "_kj.asBytes(),
   1090       "baz"_kj.asBytes()
   1091     };
   1092 
   1093     req.body->write(kj::arrayPtr(bodyParts, kj::size(bodyParts))).wait(waitScope);
   1094     req.body = nullptr;
   1095 
   1096     // Wait for a response so the client has a chance to end the request body with a 0-chunk.
   1097     kj::StringPtr responseText = "HTTP/1.1 204 No Content\r\n\r\n";
   1098     pipe.ends[1]->write(responseText.begin(), responseText.size()).wait(waitScope);
   1099     auto response = req.response.wait(waitScope);
   1100   }
   1101 
   1102   pipe.ends[0]->shutdownWrite();
   1103 
   1104   auto text = serverPromise.wait(waitScope);
   1105   KJ_EXPECT(text == "POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
   1106                     "b\r\nfoo bar baz\r\n0\r\n\r\n", text);
   1107 }
   1108 
   1109 KJ_TEST("HttpClient chunked body pump from fixed length stream") {
   1110   class FixedBodyStream final: public kj::AsyncInputStream {
   1111     Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
   1112       auto n = kj::min(body.size(), maxBytes);
   1113       n = kj::max(n, minBytes);
   1114       n = kj::min(n, body.size());
   1115       memcpy(buffer, body.begin(), n);
   1116       body = body.slice(n);
   1117       return n;
   1118     }
   1119 
   1120     Maybe<uint64_t> tryGetLength() override { return body.size(); }
   1121 
   1122     kj::StringPtr body = "foo bar baz";
   1123   };
   1124 
   1125   KJ_HTTP_TEST_SETUP_IO;
   1126 
   1127   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1128 
   1129   auto serverPromise = pipe.ends[1]->readAllText();
   1130 
   1131   {
   1132     HttpHeaderTable table;
   1133     auto client = newHttpClient(table, *pipe.ends[0]);
   1134 
   1135     auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table));
   1136 
   1137     FixedBodyStream bodyStream;
   1138     bodyStream.pumpTo(*req.body).wait(waitScope);
   1139     req.body = nullptr;
   1140 
   1141     // Wait for a response so the client has a chance to end the request body with a 0-chunk.
   1142     kj::StringPtr responseText = "HTTP/1.1 204 No Content\r\n\r\n";
   1143     pipe.ends[1]->write(responseText.begin(), responseText.size()).wait(waitScope);
   1144     auto response = req.response.wait(waitScope);
   1145   }
   1146 
   1147   pipe.ends[0]->shutdownWrite();
   1148 
   1149   auto text = serverPromise.wait(waitScope);
   1150   KJ_EXPECT(text == "POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
   1151                     "b\r\nfoo bar baz\r\n0\r\n\r\n", text);
   1152 }
   1153 
   1154 KJ_TEST("HttpServer requests") {
   1155   HttpResponseTestCase RESPONSE = {
   1156     "HTTP/1.1 200 OK\r\n"
   1157     "Content-Length: 3\r\n"
   1158     "\r\n"
   1159     "foo",
   1160 
   1161     200, "OK",
   1162     {},
   1163     3, {"foo"}
   1164   };
   1165 
   1166   HttpResponseTestCase HEAD_RESPONSE = {
   1167     "HTTP/1.1 200 OK\r\n"
   1168     "Content-Length: 3\r\n"
   1169     "\r\n",
   1170 
   1171     200, "OK",
   1172     {},
   1173     3, {"foo"}
   1174   };
   1175 
   1176   KJ_HTTP_TEST_SETUP_IO;
   1177   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   1178 
   1179   for (auto& testCase: requestTestCases()) {
   1180     if (testCase.side == CLIENT_ONLY) continue;
   1181     KJ_CONTEXT(testCase.raw);
   1182     testHttpServerRequest(waitScope, timer, testCase,
   1183         testCase.method == HttpMethod::HEAD ? HEAD_RESPONSE : RESPONSE,
   1184         KJ_HTTP_TEST_CREATE_2PIPE);
   1185   }
   1186 }
   1187 
   1188 KJ_TEST("HttpServer responses") {
   1189   HttpRequestTestCase REQUEST = {
   1190     "GET / HTTP/1.1\r\n"
   1191     "\r\n",
   1192 
   1193     HttpMethod::GET,
   1194     "/",
   1195     {},
   1196     uint64_t(0), {},
   1197   };
   1198 
   1199   HttpRequestTestCase HEAD_REQUEST = {
   1200     "HEAD / HTTP/1.1\r\n"
   1201     "\r\n",
   1202 
   1203     HttpMethod::HEAD,
   1204     "/",
   1205     {},
   1206     uint64_t(0), {},
   1207   };
   1208 
   1209   KJ_HTTP_TEST_SETUP_IO;
   1210   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   1211 
   1212   for (auto& testCase: responseTestCases()) {
   1213     if (testCase.side == CLIENT_ONLY) continue;
   1214     KJ_CONTEXT(testCase.raw);
   1215     testHttpServerRequest(waitScope, timer,
   1216         testCase.method == HttpMethod::HEAD ? HEAD_REQUEST : REQUEST, testCase,
   1217         KJ_HTTP_TEST_CREATE_2PIPE);
   1218   }
   1219 }
   1220 
   1221 // -----------------------------------------------------------------------------
   1222 
   1223 kj::ArrayPtr<const HttpTestCase> pipelineTestCases() {
   1224   static const HttpTestCase PIPELINE_TESTS[] = {
   1225     {
   1226       {
   1227         "GET / HTTP/1.1\r\n"
   1228         "\r\n",
   1229 
   1230         HttpMethod::GET, "/", {}, uint64_t(0), {},
   1231       },
   1232       {
   1233         "HTTP/1.1 200 OK\r\n"
   1234         "Content-Length: 7\r\n"
   1235         "\r\n"
   1236         "foo bar",
   1237 
   1238         200, "OK", {}, 7, { "foo bar" }
   1239       },
   1240     },
   1241 
   1242     {
   1243       {
   1244         "POST /foo HTTP/1.1\r\n"
   1245         "Content-Length: 6\r\n"
   1246         "\r\n"
   1247         "grault",
   1248 
   1249         HttpMethod::POST, "/foo", {}, 6, { "grault" },
   1250       },
   1251       {
   1252         "HTTP/1.1 404 Not Found\r\n"
   1253         "Content-Length: 13\r\n"
   1254         "\r\n"
   1255         "baz qux corge",
   1256 
   1257         404, "Not Found", {}, 13, { "baz qux corge" }
   1258       },
   1259     },
   1260 
   1261     // Throw a zero-size request/response into the pipeline to check for a bug that existed with
   1262     // them previously.
   1263     {
   1264       {
   1265         "POST /foo HTTP/1.1\r\n"
   1266         "Content-Length: 0\r\n"
   1267         "\r\n",
   1268 
   1269         HttpMethod::POST, "/foo", {}, uint64_t(0), {},
   1270       },
   1271       {
   1272         "HTTP/1.1 200 OK\r\n"
   1273         "Content-Length: 0\r\n"
   1274         "\r\n",
   1275 
   1276         200, "OK", {}, uint64_t(0), {}
   1277       },
   1278     },
   1279 
   1280     // Also a zero-size chunked request/response.
   1281     {
   1282       {
   1283         "POST /foo HTTP/1.1\r\n"
   1284         "Transfer-Encoding: chunked\r\n"
   1285         "\r\n"
   1286         "0\r\n"
   1287         "\r\n",
   1288 
   1289         HttpMethod::POST, "/foo", {}, nullptr, {},
   1290       },
   1291       {
   1292         "HTTP/1.1 200 OK\r\n"
   1293         "Transfer-Encoding: chunked\r\n"
   1294         "\r\n"
   1295         "0\r\n"
   1296         "\r\n",
   1297 
   1298         200, "OK", {}, nullptr, {}
   1299       },
   1300     },
   1301 
   1302     {
   1303       {
   1304         "POST /bar HTTP/1.1\r\n"
   1305         "Transfer-Encoding: chunked\r\n"
   1306         "\r\n"
   1307         "6\r\n"
   1308         "garply\r\n"
   1309         "5\r\n"
   1310         "waldo\r\n"
   1311         "0\r\n"
   1312         "\r\n",
   1313 
   1314         HttpMethod::POST, "/bar", {}, nullptr, { "garply", "waldo" },
   1315       },
   1316       {
   1317         "HTTP/1.1 200 OK\r\n"
   1318         "Transfer-Encoding: chunked\r\n"
   1319         "\r\n"
   1320         "4\r\n"
   1321         "fred\r\n"
   1322         "5\r\n"
   1323         "plugh\r\n"
   1324         "0\r\n"
   1325         "\r\n",
   1326 
   1327         200, "OK", {}, nullptr, { "fred", "plugh" }
   1328       },
   1329     },
   1330 
   1331     {
   1332       {
   1333         "HEAD / HTTP/1.1\r\n"
   1334         "\r\n",
   1335 
   1336         HttpMethod::HEAD, "/", {}, uint64_t(0), {},
   1337       },
   1338       {
   1339         "HTTP/1.1 200 OK\r\n"
   1340         "Content-Length: 7\r\n"
   1341         "\r\n",
   1342 
   1343         200, "OK", {}, 7, { "foo bar" }
   1344       },
   1345     },
   1346 
   1347     // Zero-length expected size response to HEAD request has no Content-Length header.
   1348     {
   1349       {
   1350         "HEAD / HTTP/1.1\r\n"
   1351         "\r\n",
   1352 
   1353         HttpMethod::HEAD, "/", {}, uint64_t(0), {},
   1354       },
   1355       {
   1356         "HTTP/1.1 200 OK\r\n"
   1357         "\r\n",
   1358 
   1359         200, "OK", {}, uint64_t(0), {}, HttpMethod::HEAD,
   1360       },
   1361     },
   1362   };
   1363 
   1364   // TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents RESPONSE_TEST_CASES from implicitly
   1365   //   casting to our return type.
   1366   return kj::arrayPtr(PIPELINE_TESTS, kj::size(PIPELINE_TESTS));
   1367 }
   1368 
   1369 KJ_TEST("HttpClient pipeline") {
   1370   auto PIPELINE_TESTS = pipelineTestCases();
   1371 
   1372   KJ_HTTP_TEST_SETUP_IO;
   1373   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1374 
   1375   kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
   1376   for (auto& testCase: PIPELINE_TESTS) {
   1377     writeResponsesPromise = writeResponsesPromise
   1378         .then([&]() {
   1379       return expectRead(*pipe.ends[1], testCase.request.raw);
   1380     }).then([&]() {
   1381       return pipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size());
   1382     });
   1383   }
   1384 
   1385   HttpHeaderTable table;
   1386   auto client = newHttpClient(table, *pipe.ends[0]);
   1387 
   1388   for (auto& testCase: PIPELINE_TESTS) {
   1389     testHttpClient(waitScope, table, *client, testCase);
   1390   }
   1391 
   1392   client = nullptr;
   1393   pipe.ends[0]->shutdownWrite();
   1394 
   1395   writeResponsesPromise.wait(waitScope);
   1396 }
   1397 
   1398 KJ_TEST("HttpClient parallel pipeline") {
   1399   auto PIPELINE_TESTS = pipelineTestCases();
   1400 
   1401   KJ_HTTP_TEST_SETUP_IO;
   1402   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1403 
   1404   kj::Promise<void> readRequestsPromise = kj::READY_NOW;
   1405   kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
   1406   for (auto& testCase: PIPELINE_TESTS) {
   1407     auto forked = readRequestsPromise
   1408         .then([&]() {
   1409       return expectRead(*pipe.ends[1], testCase.request.raw);
   1410     }).fork();
   1411     readRequestsPromise = forked.addBranch();
   1412 
   1413     // Don't write each response until the corresponding request is received.
   1414     auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
   1415     promises.add(forked.addBranch());
   1416     promises.add(kj::mv(writeResponsesPromise));
   1417     writeResponsesPromise = kj::joinPromises(promises.finish()).then([&]() {
   1418       return pipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size());
   1419     });
   1420   }
   1421 
   1422   HttpHeaderTable table;
   1423   auto client = newHttpClient(table, *pipe.ends[0]);
   1424 
   1425   auto responsePromises = KJ_MAP(testCase, PIPELINE_TESTS) {
   1426     KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
   1427 
   1428     HttpHeaders headers(table);
   1429     for (auto& header: testCase.request.requestHeaders) {
   1430       headers.set(header.id, header.value);
   1431     }
   1432 
   1433     auto request = client->request(
   1434         testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
   1435     for (auto& part: testCase.request.requestBodyParts) {
   1436       request.body->write(part.begin(), part.size()).wait(waitScope);
   1437     }
   1438 
   1439     return kj::mv(request.response);
   1440   };
   1441 
   1442   for (auto i: kj::indices(PIPELINE_TESTS)) {
   1443     auto& testCase = PIPELINE_TESTS[i];
   1444     auto response = responsePromises[i].wait(waitScope);
   1445 
   1446     KJ_EXPECT(response.statusCode == testCase.response.statusCode);
   1447     auto body = response.body->readAllText().wait(waitScope);
   1448     if (testCase.request.method == HttpMethod::HEAD) {
   1449       KJ_EXPECT(body == "");
   1450     } else {
   1451       KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
   1452     }
   1453   }
   1454 
   1455   client = nullptr;
   1456   pipe.ends[0]->shutdownWrite();
   1457 
   1458   writeResponsesPromise.wait(waitScope);
   1459 }
   1460 
   1461 KJ_TEST("HttpServer pipeline") {
   1462   auto PIPELINE_TESTS = pipelineTestCases();
   1463 
   1464   KJ_HTTP_TEST_SETUP_IO;
   1465   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   1466   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1467 
   1468   HttpHeaderTable table;
   1469   TestHttpService service(PIPELINE_TESTS, table);
   1470   HttpServer server(timer, table, service);
   1471 
   1472   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   1473 
   1474   for (auto& testCase: PIPELINE_TESTS) {
   1475     KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
   1476 
   1477     pipe.ends[1]->write(testCase.request.raw.begin(), testCase.request.raw.size())
   1478         .wait(waitScope);
   1479 
   1480     expectRead(*pipe.ends[1], testCase.response.raw).wait(waitScope);
   1481   }
   1482 
   1483   pipe.ends[1]->shutdownWrite();
   1484   listenTask.wait(waitScope);
   1485 
   1486   KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
   1487 }
   1488 
   1489 KJ_TEST("HttpServer parallel pipeline") {
   1490   auto PIPELINE_TESTS = pipelineTestCases();
   1491 
   1492   KJ_HTTP_TEST_SETUP_IO;
   1493   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   1494   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1495 
   1496   auto allRequestText =
   1497       kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.request.raw; }, "");
   1498   auto allResponseText =
   1499       kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.response.raw; }, "");
   1500 
   1501   HttpHeaderTable table;
   1502   TestHttpService service(PIPELINE_TESTS, table);
   1503   HttpServer server(timer, table, service);
   1504 
   1505   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   1506 
   1507   pipe.ends[1]->write(allRequestText.begin(), allRequestText.size()).wait(waitScope);
   1508   pipe.ends[1]->shutdownWrite();
   1509 
   1510   auto rawResponse = pipe.ends[1]->readAllText().wait(waitScope);
   1511   KJ_EXPECT(rawResponse == allResponseText, rawResponse);
   1512 
   1513   listenTask.wait(waitScope);
   1514 
   1515   KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
   1516 }
   1517 
   1518 KJ_TEST("HttpClient <-> HttpServer") {
   1519   auto PIPELINE_TESTS = pipelineTestCases();
   1520 
   1521   KJ_HTTP_TEST_SETUP_IO;
   1522   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   1523   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1524 
   1525   HttpHeaderTable table;
   1526   TestHttpService service(PIPELINE_TESTS, table);
   1527   HttpServer server(timer, table, service);
   1528 
   1529   auto listenTask = server.listenHttp(kj::mv(pipe.ends[1]));
   1530   auto client = newHttpClient(table, *pipe.ends[0]);
   1531 
   1532   for (auto& testCase: PIPELINE_TESTS) {
   1533     testHttpClient(waitScope, table, *client, testCase);
   1534   }
   1535 
   1536   client = nullptr;
   1537   pipe.ends[0]->shutdownWrite();
   1538   listenTask.wait(waitScope);
   1539   KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
   1540 }
   1541 
   1542 // -----------------------------------------------------------------------------
   1543 
   1544 KJ_TEST("HttpInputStream requests") {
   1545   KJ_HTTP_TEST_SETUP_IO;
   1546 
   1547   kj::HttpHeaderTable table;
   1548 
   1549   auto pipe = kj::newOneWayPipe();
   1550   auto input = newHttpInputStream(*pipe.in, table);
   1551 
   1552   kj::Promise<void> writeQueue = kj::READY_NOW;
   1553 
   1554   for (auto& testCase: requestTestCases()) {
   1555     writeQueue = writeQueue.then([&]() {
   1556       return pipe.out->write(testCase.raw.begin(), testCase.raw.size());
   1557     });
   1558   }
   1559   writeQueue = writeQueue.then([&]() {
   1560     pipe.out = nullptr;
   1561   });
   1562 
   1563   for (auto& testCase: requestTestCases()) {
   1564     KJ_CONTEXT(testCase.raw);
   1565 
   1566     KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
   1567 
   1568     auto req = input->readRequest().wait(waitScope);
   1569     KJ_EXPECT(req.method == testCase.method);
   1570     KJ_EXPECT(req.url == testCase.path);
   1571     for (auto& header: testCase.requestHeaders) {
   1572       KJ_EXPECT(KJ_ASSERT_NONNULL(req.headers.get(header.id)) == header.value);
   1573     }
   1574     auto body = req.body->readAllText().wait(waitScope);
   1575     KJ_EXPECT(body == kj::strArray(testCase.requestBodyParts, ""));
   1576   }
   1577 
   1578   writeQueue.wait(waitScope);
   1579   KJ_EXPECT(!input->awaitNextMessage().wait(waitScope));
   1580 }
   1581 
   1582 KJ_TEST("HttpInputStream responses") {
   1583   KJ_HTTP_TEST_SETUP_IO;
   1584 
   1585   kj::HttpHeaderTable table;
   1586 
   1587   auto pipe = kj::newOneWayPipe();
   1588   auto input = newHttpInputStream(*pipe.in, table);
   1589 
   1590   kj::Promise<void> writeQueue = kj::READY_NOW;
   1591 
   1592   for (auto& testCase: responseTestCases()) {
   1593     if (testCase.side == CLIENT_ONLY) continue;  // skip Connection: close case.
   1594     writeQueue = writeQueue.then([&]() {
   1595       return pipe.out->write(testCase.raw.begin(), testCase.raw.size());
   1596     });
   1597   }
   1598   writeQueue = writeQueue.then([&]() {
   1599     pipe.out = nullptr;
   1600   });
   1601 
   1602   for (auto& testCase: responseTestCases()) {
   1603     if (testCase.side == CLIENT_ONLY) continue;  // skip Connection: close case.
   1604     KJ_CONTEXT(testCase.raw);
   1605 
   1606     KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
   1607 
   1608     auto resp = input->readResponse(testCase.method).wait(waitScope);
   1609     KJ_EXPECT(resp.statusCode == testCase.statusCode);
   1610     KJ_EXPECT(resp.statusText == testCase.statusText);
   1611     for (auto& header: testCase.responseHeaders) {
   1612       KJ_EXPECT(KJ_ASSERT_NONNULL(resp.headers.get(header.id)) == header.value);
   1613     }
   1614     auto body = resp.body->readAllText().wait(waitScope);
   1615     KJ_EXPECT(body == kj::strArray(testCase.responseBodyParts, ""));
   1616   }
   1617 
   1618   writeQueue.wait(waitScope);
   1619   KJ_EXPECT(!input->awaitNextMessage().wait(waitScope));
   1620 }
   1621 
   1622 KJ_TEST("HttpInputStream bare messages") {
   1623   KJ_HTTP_TEST_SETUP_IO;
   1624 
   1625   kj::HttpHeaderTable table;
   1626 
   1627   auto pipe = kj::newOneWayPipe();
   1628   auto input = newHttpInputStream(*pipe.in, table);
   1629 
   1630   kj::StringPtr messages =
   1631       "Content-Length: 6\r\n"
   1632       "\r\n"
   1633       "foobar"
   1634       "Content-Length: 11\r\n"
   1635       "Content-Type: some/type\r\n"
   1636       "\r\n"
   1637       "bazquxcorge"
   1638       "Transfer-Encoding: chunked\r\n"
   1639       "\r\n"
   1640       "6\r\n"
   1641       "grault\r\n"
   1642       "b\r\n"
   1643       "garplywaldo\r\n"
   1644       "0\r\n"
   1645       "\r\n"_kj;
   1646 
   1647   kj::Promise<void> writeTask = pipe.out->write(messages.begin(), messages.size())
   1648       .then([&]() { pipe.out = nullptr; });
   1649 
   1650   {
   1651     KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
   1652     auto message = input->readMessage().wait(waitScope);
   1653     KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_LENGTH)) == "6");
   1654     KJ_EXPECT(message.body->readAllText().wait(waitScope) == "foobar");
   1655   }
   1656   {
   1657     KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
   1658     auto message = input->readMessage().wait(waitScope);
   1659     KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_LENGTH)) == "11");
   1660     KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_TYPE)) == "some/type");
   1661     KJ_EXPECT(message.body->readAllText().wait(waitScope) == "bazquxcorge");
   1662   }
   1663   {
   1664     KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
   1665     auto message = input->readMessage().wait(waitScope);
   1666     KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::TRANSFER_ENCODING)) == "chunked");
   1667     KJ_EXPECT(message.body->readAllText().wait(waitScope) == "graultgarplywaldo");
   1668   }
   1669 
   1670   writeTask.wait(waitScope);
   1671   KJ_EXPECT(!input->awaitNextMessage().wait(waitScope));
   1672 }
   1673 
   1674 // -----------------------------------------------------------------------------
   1675 
   1676 KJ_TEST("WebSocket core protocol") {
   1677   KJ_HTTP_TEST_SETUP_IO;
   1678   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1679 
   1680   auto client = newWebSocket(kj::mv(pipe.ends[0]), nullptr);
   1681   auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
   1682 
   1683   auto mediumString = kj::strArray(kj::repeat(kj::StringPtr("123456789"), 30), "");
   1684   auto bigString = kj::strArray(kj::repeat(kj::StringPtr("123456789"), 10000), "");
   1685 
   1686   auto clientTask = client->send(kj::StringPtr("hello"))
   1687       .then([&]() { return client->send(mediumString); })
   1688       .then([&]() { return client->send(bigString); })
   1689       .then([&]() { return client->send(kj::StringPtr("world").asBytes()); })
   1690       .then([&]() { return client->close(1234, "bored"); })
   1691       .then([&]() { KJ_EXPECT(client->sentByteCount() == 90307)});
   1692 
   1693   {
   1694     auto message = server->receive().wait(waitScope);
   1695     KJ_ASSERT(message.is<kj::String>());
   1696     KJ_EXPECT(message.get<kj::String>() == "hello");
   1697   }
   1698 
   1699   {
   1700     auto message = server->receive().wait(waitScope);
   1701     KJ_ASSERT(message.is<kj::String>());
   1702     KJ_EXPECT(message.get<kj::String>() == mediumString);
   1703   }
   1704 
   1705   {
   1706     auto message = server->receive().wait(waitScope);
   1707     KJ_ASSERT(message.is<kj::String>());
   1708     KJ_EXPECT(message.get<kj::String>() == bigString);
   1709   }
   1710 
   1711   {
   1712     auto message = server->receive().wait(waitScope);
   1713     KJ_ASSERT(message.is<kj::Array<byte>>());
   1714     KJ_EXPECT(kj::str(message.get<kj::Array<byte>>().asChars()) == "world");
   1715   }
   1716 
   1717   {
   1718     auto message = server->receive().wait(waitScope);
   1719     KJ_ASSERT(message.is<WebSocket::Close>());
   1720     KJ_EXPECT(message.get<WebSocket::Close>().code == 1234);
   1721     KJ_EXPECT(message.get<WebSocket::Close>().reason == "bored");
   1722     KJ_EXPECT(server->receivedByteCount() == 90307);
   1723   }
   1724 
   1725   auto serverTask = server->close(4321, "whatever");
   1726 
   1727   {
   1728     auto message = client->receive().wait(waitScope);
   1729     KJ_ASSERT(message.is<WebSocket::Close>());
   1730     KJ_EXPECT(message.get<WebSocket::Close>().code == 4321);
   1731     KJ_EXPECT(message.get<WebSocket::Close>().reason == "whatever");
   1732     KJ_EXPECT(client->receivedByteCount() == 12);
   1733   }
   1734 
   1735   clientTask.wait(waitScope);
   1736   serverTask.wait(waitScope);
   1737 }
   1738 
   1739 KJ_TEST("WebSocket fragmented") {
   1740   KJ_HTTP_TEST_SETUP_IO;
   1741   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1742 
   1743   auto client = kj::mv(pipe.ends[0]);
   1744   auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
   1745 
   1746   byte DATA[] = {
   1747     0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ',
   1748 
   1749     0x00, 0x03, 'w', 'o', 'r',
   1750 
   1751     0x80, 0x02, 'l', 'd',
   1752   };
   1753 
   1754   auto clientTask = client->write(DATA, sizeof(DATA));
   1755 
   1756   {
   1757     auto message = server->receive().wait(waitScope);
   1758     KJ_ASSERT(message.is<kj::String>());
   1759     KJ_EXPECT(message.get<kj::String>() == "hello world");
   1760   }
   1761 
   1762   clientTask.wait(waitScope);
   1763 }
   1764 
   1765 class FakeEntropySource final: public EntropySource {
   1766 public:
   1767   void generate(kj::ArrayPtr<byte> buffer) override {
   1768     static constexpr byte DUMMY[4] = { 12, 34, 56, 78 };
   1769 
   1770     for (auto i: kj::indices(buffer)) {
   1771       buffer[i] = DUMMY[i % sizeof(DUMMY)];
   1772     }
   1773   }
   1774 };
   1775 
   1776 KJ_TEST("WebSocket masked") {
   1777   KJ_HTTP_TEST_SETUP_IO;
   1778   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1779   FakeEntropySource maskGenerator;
   1780 
   1781   auto client = kj::mv(pipe.ends[0]);
   1782   auto server = newWebSocket(kj::mv(pipe.ends[1]), maskGenerator);
   1783 
   1784   byte DATA[] = {
   1785     0x81, 0x86, 12, 34, 56, 78, 'h' ^ 12, 'e' ^ 34, 'l' ^ 56, 'l' ^ 78, 'o' ^ 12, ' ' ^ 34,
   1786   };
   1787 
   1788   auto clientTask = client->write(DATA, sizeof(DATA));
   1789   auto serverTask = server->send(kj::StringPtr("hello "));
   1790 
   1791   {
   1792     auto message = server->receive().wait(waitScope);
   1793     KJ_ASSERT(message.is<kj::String>());
   1794     KJ_EXPECT(message.get<kj::String>() == "hello ");
   1795   }
   1796 
   1797   expectRead(*client, DATA).wait(waitScope);
   1798 
   1799   clientTask.wait(waitScope);
   1800   serverTask.wait(waitScope);
   1801 }
   1802 
   1803 KJ_TEST("WebSocket unsolicited pong") {
   1804   KJ_HTTP_TEST_SETUP_IO;
   1805   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1806 
   1807   auto client = kj::mv(pipe.ends[0]);
   1808   auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
   1809 
   1810   byte DATA[] = {
   1811     0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ',
   1812 
   1813     0x8A, 0x03, 'f', 'o', 'o',
   1814 
   1815     0x80, 0x05, 'w', 'o', 'r', 'l', 'd',
   1816   };
   1817 
   1818   auto clientTask = client->write(DATA, sizeof(DATA));
   1819 
   1820   {
   1821     auto message = server->receive().wait(waitScope);
   1822     KJ_ASSERT(message.is<kj::String>());
   1823     KJ_EXPECT(message.get<kj::String>() == "hello world");
   1824   }
   1825 
   1826   clientTask.wait(waitScope);
   1827 }
   1828 
   1829 KJ_TEST("WebSocket ping") {
   1830   KJ_HTTP_TEST_SETUP_IO;
   1831   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1832 
   1833   auto client = kj::mv(pipe.ends[0]);
   1834   auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
   1835 
   1836   // Be extra-annoying by having the ping arrive between fragments.
   1837   byte DATA[] = {
   1838     0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ',
   1839 
   1840     0x89, 0x03, 'f', 'o', 'o',
   1841 
   1842     0x80, 0x05, 'w', 'o', 'r', 'l', 'd',
   1843   };
   1844 
   1845   auto clientTask = client->write(DATA, sizeof(DATA));
   1846 
   1847   {
   1848     auto message = server->receive().wait(waitScope);
   1849     KJ_ASSERT(message.is<kj::String>());
   1850     KJ_EXPECT(message.get<kj::String>() == "hello world");
   1851   }
   1852 
   1853   auto serverTask = server->send(kj::StringPtr("bar"));
   1854 
   1855   byte EXPECTED[] = {
   1856     0x8A, 0x03, 'f', 'o', 'o',  // pong
   1857     0x81, 0x03, 'b', 'a', 'r',  // message
   1858   };
   1859 
   1860   expectRead(*client, EXPECTED).wait(waitScope);
   1861 
   1862   clientTask.wait(waitScope);
   1863   serverTask.wait(waitScope);
   1864 }
   1865 
   1866 KJ_TEST("WebSocket ping mid-send") {
   1867   KJ_HTTP_TEST_SETUP_IO;
   1868   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1869 
   1870   auto client = kj::mv(pipe.ends[0]);
   1871   auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
   1872 
   1873   auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), "");
   1874   auto serverTask = server->send(bigString).eagerlyEvaluate(nullptr);
   1875 
   1876   byte DATA[] = {
   1877     0x89, 0x03, 'f', 'o', 'o',  // ping
   1878     0x81, 0x03, 'b', 'a', 'r',  // some other message
   1879   };
   1880 
   1881   auto clientTask = client->write(DATA, sizeof(DATA));
   1882 
   1883   {
   1884     auto message = server->receive().wait(waitScope);
   1885     KJ_ASSERT(message.is<kj::String>());
   1886     KJ_EXPECT(message.get<kj::String>() == "bar");
   1887   }
   1888 
   1889   byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
   1890   expectRead(*client, EXPECTED1).wait(waitScope);
   1891   expectRead(*client, bigString).wait(waitScope);
   1892 
   1893   byte EXPECTED2[] = { 0x8A, 0x03, 'f', 'o', 'o' };
   1894   expectRead(*client, EXPECTED2).wait(waitScope);
   1895 
   1896   clientTask.wait(waitScope);
   1897   serverTask.wait(waitScope);
   1898 }
   1899 
   1900 class InputOutputPair final: public kj::AsyncIoStream {
   1901   // Creates an AsyncIoStream out of an AsyncInputStream and an AsyncOutputStream.
   1902 
   1903 public:
   1904   InputOutputPair(kj::Own<kj::AsyncInputStream> in, kj::Own<kj::AsyncOutputStream> out)
   1905       : in(kj::mv(in)), out(kj::mv(out)) {}
   1906 
   1907   kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
   1908     return in->read(buffer, minBytes, maxBytes);
   1909   }
   1910   kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
   1911     return in->tryRead(buffer, minBytes, maxBytes);
   1912   }
   1913 
   1914   Maybe<uint64_t> tryGetLength() override {
   1915     return in->tryGetLength();
   1916   }
   1917 
   1918   Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount = kj::maxValue) override {
   1919     return in->pumpTo(output, amount);
   1920   }
   1921 
   1922   kj::Promise<void> write(const void* buffer, size_t size) override {
   1923     return out->write(buffer, size);
   1924   }
   1925 
   1926   kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
   1927     return out->write(pieces);
   1928   }
   1929 
   1930   kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
   1931       kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
   1932     return out->tryPumpFrom(input, amount);
   1933   }
   1934 
   1935   Promise<void> whenWriteDisconnected() override {
   1936     return out->whenWriteDisconnected();
   1937   }
   1938 
   1939   void shutdownWrite() override {
   1940     out = nullptr;
   1941   }
   1942 
   1943 private:
   1944   kj::Own<kj::AsyncInputStream> in;
   1945   kj::Own<kj::AsyncOutputStream> out;
   1946 };
   1947 
   1948 KJ_TEST("WebSocket double-ping mid-send") {
   1949   KJ_HTTP_TEST_SETUP_IO;
   1950 
   1951   auto upPipe = newOneWayPipe();
   1952   auto downPipe = newOneWayPipe();
   1953   InputOutputPair client(kj::mv(downPipe.in), kj::mv(upPipe.out));
   1954   auto server = newWebSocket(kj::heap<InputOutputPair>(kj::mv(upPipe.in), kj::mv(downPipe.out)),
   1955                              nullptr);
   1956 
   1957   auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), "");
   1958   auto serverTask = server->send(bigString).eagerlyEvaluate(nullptr);
   1959 
   1960   byte DATA[] = {
   1961     0x89, 0x03, 'f', 'o', 'o',  // ping
   1962     0x89, 0x03, 'q', 'u', 'x',  // ping2
   1963     0x81, 0x03, 'b', 'a', 'r',  // some other message
   1964   };
   1965 
   1966   auto clientTask = client.write(DATA, sizeof(DATA));
   1967 
   1968   {
   1969     auto message = server->receive().wait(waitScope);
   1970     KJ_ASSERT(message.is<kj::String>());
   1971     KJ_EXPECT(message.get<kj::String>() == "bar");
   1972   }
   1973 
   1974   byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
   1975   expectRead(client, EXPECTED1).wait(waitScope);
   1976   expectRead(client, bigString).wait(waitScope);
   1977 
   1978   byte EXPECTED2[] = { 0x8A, 0x03, 'q', 'u', 'x' };
   1979   expectRead(client, EXPECTED2).wait(waitScope);
   1980 
   1981   clientTask.wait(waitScope);
   1982   serverTask.wait(waitScope);
   1983 }
   1984 
   1985 KJ_TEST("WebSocket ping received during pong send") {
   1986   KJ_HTTP_TEST_SETUP_IO;
   1987   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   1988 
   1989   auto client = kj::mv(pipe.ends[0]);
   1990   auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
   1991 
   1992   // Send a very large ping so that sending the pong takes a while. Then send a second ping
   1993   // immediately after.
   1994   byte PREFIX[] = { 0x89, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
   1995   auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), "");
   1996   byte POSTFIX[] = {
   1997     0x89, 0x03, 'f', 'o', 'o',
   1998     0x81, 0x03, 'b', 'a', 'r',
   1999   };
   2000 
   2001   kj::ArrayPtr<const byte> parts[] = {PREFIX, bigString.asBytes(), POSTFIX};
   2002   auto clientTask = client->write(parts);
   2003 
   2004   {
   2005     auto message = server->receive().wait(waitScope);
   2006     KJ_ASSERT(message.is<kj::String>());
   2007     KJ_EXPECT(message.get<kj::String>() == "bar");
   2008   }
   2009 
   2010   byte EXPECTED1[] = { 0x8A, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
   2011   expectRead(*client, EXPECTED1).wait(waitScope);
   2012   expectRead(*client, bigString).wait(waitScope);
   2013 
   2014   byte EXPECTED2[] = { 0x8A, 0x03, 'f', 'o', 'o' };
   2015   expectRead(*client, EXPECTED2).wait(waitScope);
   2016 
   2017   clientTask.wait(waitScope);
   2018 }
   2019 
   2020 KJ_TEST("WebSocket pump byte counting") {
   2021   KJ_HTTP_TEST_SETUP_IO;
   2022   auto pipe1 = KJ_HTTP_TEST_CREATE_2PIPE;
   2023   auto pipe2 = KJ_HTTP_TEST_CREATE_2PIPE;
   2024 
   2025   FakeEntropySource maskGenerator;
   2026   auto server1 = newWebSocket(kj::mv(pipe1.ends[1]), nullptr);
   2027   auto client2 = newWebSocket(kj::mv(pipe2.ends[0]), maskGenerator);
   2028   auto server2 = newWebSocket(kj::mv(pipe2.ends[1]), nullptr);
   2029 
   2030   auto pumpTask = server1->pumpTo(*client2);
   2031   auto receiveTask = server2->receive();
   2032 
   2033   // Client sends three bytes of a valid message then disconnects.
   2034   const char DATA[] = {0x01, 0x06, 'h'};
   2035   pipe1.ends[0]->write(DATA, 3).wait(waitScope);
   2036   pipe1.ends[0] = nullptr;
   2037 
   2038   // The pump completes successfully, forwarding the disconnect.
   2039   pumpTask.wait(waitScope);
   2040 
   2041   // The eventual receiver gets a disconnect execption.
   2042   // (Note: We don't use KJ_EXPECT_THROW here because under -fno-exceptions it forks and we lose
   2043   // state.)
   2044   receiveTask.then([](auto) {
   2045     KJ_FAIL_EXPECT("expected exception");
   2046   }, [](kj::Exception&& e) {
   2047     KJ_EXPECT(e.getType() == kj::Exception::Type::DISCONNECTED);
   2048   }).wait(waitScope);
   2049 
   2050   KJ_EXPECT(server1->receivedByteCount() == 3);
   2051 #if KJ_NO_RTTI
   2052   // Optimized socket pump will be disabled, so only whole messages are counted by client2/server2.
   2053   KJ_EXPECT(client2->sentByteCount() == 0);
   2054   KJ_EXPECT(server2->receivedByteCount() == 0);
   2055 #else
   2056   KJ_EXPECT(client2->sentByteCount() == 3);
   2057   KJ_EXPECT(server2->receivedByteCount() == 3);
   2058 #endif
   2059 }
   2060 
   2061 KJ_TEST("WebSocket pump disconnect on send") {
   2062   KJ_HTTP_TEST_SETUP_IO;
   2063   auto pipe1 = KJ_HTTP_TEST_CREATE_2PIPE;
   2064   auto pipe2 = KJ_HTTP_TEST_CREATE_2PIPE;
   2065 
   2066   FakeEntropySource maskGenerator;
   2067   auto client1 = newWebSocket(kj::mv(pipe1.ends[0]), maskGenerator);
   2068   auto server1 = newWebSocket(kj::mv(pipe1.ends[1]), nullptr);
   2069   auto client2 = newWebSocket(kj::mv(pipe2.ends[0]), maskGenerator);
   2070 
   2071   auto pumpTask = server1->pumpTo(*client2);
   2072   auto sendTask = client1->send("hello"_kj);
   2073 
   2074   // Endpoint reads three bytes and then disconnects.
   2075   char buffer[3];
   2076   pipe2.ends[1]->read(buffer, 3).wait(waitScope);
   2077   pipe2.ends[1] = nullptr;
   2078 
   2079   // Pump throws disconnected.
   2080   KJ_EXPECT_THROW_RECOVERABLE(DISCONNECTED, pumpTask.wait(waitScope));
   2081 
   2082   // client1 may or may not have been able to send its whole message depending on buffering.
   2083   sendTask.then([]() {}, [](kj::Exception&& e) {
   2084     KJ_EXPECT(e.getType() == kj::Exception::Type::DISCONNECTED);
   2085   }).wait(waitScope);
   2086 }
   2087 
   2088 KJ_TEST("WebSocket pump disconnect on receive") {
   2089   KJ_HTTP_TEST_SETUP_IO;
   2090   auto pipe1 = KJ_HTTP_TEST_CREATE_2PIPE;
   2091   auto pipe2 = KJ_HTTP_TEST_CREATE_2PIPE;
   2092 
   2093   FakeEntropySource maskGenerator;
   2094   auto server1 = newWebSocket(kj::mv(pipe1.ends[1]), nullptr);
   2095   auto client2 = newWebSocket(kj::mv(pipe2.ends[0]), maskGenerator);
   2096   auto server2 = newWebSocket(kj::mv(pipe2.ends[1]), nullptr);
   2097 
   2098   auto pumpTask = server1->pumpTo(*client2);
   2099   auto receiveTask = server2->receive();
   2100 
   2101   // Client sends three bytes of a valid message then disconnects.
   2102   const char DATA[] = {0x01, 0x06, 'h'};
   2103   pipe1.ends[0]->write(DATA, 3).wait(waitScope);
   2104   pipe1.ends[0] = nullptr;
   2105 
   2106   // The pump completes successfully, forwarding the disconnect.
   2107   pumpTask.wait(waitScope);
   2108 
   2109   // The eventual receiver gets a disconnect execption.
   2110   KJ_EXPECT_THROW(DISCONNECTED, receiveTask.wait(waitScope));
   2111 }
   2112 
   2113 class TestWebSocketService final: public HttpService, private kj::TaskSet::ErrorHandler {
   2114 public:
   2115   TestWebSocketService(HttpHeaderTable& headerTable, HttpHeaderId hMyHeader)
   2116       : headerTable(headerTable), hMyHeader(hMyHeader), tasks(*this) {}
   2117 
   2118   kj::Promise<void> request(
   2119       HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
   2120       kj::AsyncInputStream& requestBody, Response& response) override {
   2121     KJ_ASSERT(headers.isWebSocket());
   2122 
   2123     HttpHeaders responseHeaders(headerTable);
   2124     KJ_IF_MAYBE(h, headers.get(hMyHeader)) {
   2125       responseHeaders.set(hMyHeader, kj::str("respond-", *h));
   2126     }
   2127 
   2128     if (url == "/return-error") {
   2129       response.send(404, "Not Found", responseHeaders, uint64_t(0));
   2130       return kj::READY_NOW;
   2131     } else if (url == "/websocket") {
   2132       auto ws = response.acceptWebSocket(responseHeaders);
   2133       return doWebSocket(*ws, "start-inline").attach(kj::mv(ws));
   2134     } else {
   2135       KJ_FAIL_ASSERT("unexpected path", url);
   2136     }
   2137   }
   2138 
   2139 private:
   2140   HttpHeaderTable& headerTable;
   2141   HttpHeaderId hMyHeader;
   2142   kj::TaskSet tasks;
   2143 
   2144   void taskFailed(kj::Exception&& exception) override {
   2145     KJ_LOG(ERROR, exception);
   2146   }
   2147 
   2148   static kj::Promise<void> doWebSocket(WebSocket& ws, kj::StringPtr message) {
   2149     auto copy = kj::str(message);
   2150     return ws.send(copy).attach(kj::mv(copy))
   2151         .then([&ws]() {
   2152       return ws.receive();
   2153     }).then([&ws](WebSocket::Message&& message) {
   2154       KJ_SWITCH_ONEOF(message) {
   2155         KJ_CASE_ONEOF(str, kj::String) {
   2156           return doWebSocket(ws, kj::str("reply:", str));
   2157         }
   2158         KJ_CASE_ONEOF(data, kj::Array<byte>) {
   2159           return doWebSocket(ws, kj::str("reply:", data));
   2160         }
   2161         KJ_CASE_ONEOF(close, WebSocket::Close) {
   2162           auto reason = kj::str("close-reply:", close.reason);
   2163           return ws.close(close.code + 1, reason).attach(kj::mv(reason));
   2164         }
   2165       }
   2166       KJ_UNREACHABLE;
   2167     });
   2168   }
   2169 };
   2170 
   2171 const char WEBSOCKET_REQUEST_HANDSHAKE[] =
   2172     " HTTP/1.1\r\n"
   2173     "Connection: Upgrade\r\n"
   2174     "Upgrade: websocket\r\n"
   2175     "Sec-WebSocket-Key: DCI4TgwiOE4MIjhODCI4Tg==\r\n"
   2176     "Sec-WebSocket-Version: 13\r\n"
   2177     "My-Header: foo\r\n"
   2178     "\r\n";
   2179 const char WEBSOCKET_RESPONSE_HANDSHAKE[] =
   2180     "HTTP/1.1 101 Switching Protocols\r\n"
   2181     "Connection: Upgrade\r\n"
   2182     "Upgrade: websocket\r\n"
   2183     "Sec-WebSocket-Accept: pShtIFKT0s8RYZvnWY/CrjQD8CM=\r\n"
   2184     "My-Header: respond-foo\r\n"
   2185     "\r\n";
   2186 const char WEBSOCKET_RESPONSE_HANDSHAKE_ERROR[] =
   2187     "HTTP/1.1 404 Not Found\r\n"
   2188     "Content-Length: 0\r\n"
   2189     "My-Header: respond-foo\r\n"
   2190     "\r\n";
   2191 const byte WEBSOCKET_FIRST_MESSAGE_INLINE[] =
   2192     { 0x81, 0x0c, 's','t','a','r','t','-','i','n','l','i','n','e' };
   2193 const byte WEBSOCKET_SEND_MESSAGE[] =
   2194     { 0x81, 0x83, 12, 34, 56, 78, 'b'^12, 'a'^34, 'r'^56 };
   2195 const byte WEBSOCKET_REPLY_MESSAGE[] =
   2196     { 0x81, 0x09, 'r','e','p','l','y',':','b','a','r' };
   2197 const byte WEBSOCKET_SEND_CLOSE[] =
   2198     { 0x88, 0x85, 12, 34, 56, 78, 0x12^12, 0x34^34, 'q'^56, 'u'^78, 'x'^12 };
   2199 const byte WEBSOCKET_REPLY_CLOSE[] =
   2200     { 0x88, 0x11, 0x12, 0x35, 'c','l','o','s','e','-','r','e','p','l','y',':','q','u','x' };
   2201 
   2202 template <size_t s>
   2203 kj::ArrayPtr<const byte> asBytes(const char (&chars)[s]) {
   2204   return kj::ArrayPtr<const char>(chars, s - 1).asBytes();
   2205 }
   2206 
   2207 void testWebSocketClient(kj::WaitScope& waitScope, HttpHeaderTable& headerTable,
   2208                          kj::HttpHeaderId hMyHeader, HttpClient& client) {
   2209   kj::HttpHeaders headers(headerTable);
   2210   headers.set(hMyHeader, "foo");
   2211   auto response = client.openWebSocket("/websocket", headers).wait(waitScope);
   2212 
   2213   KJ_EXPECT(response.statusCode == 101);
   2214   KJ_EXPECT(response.statusText == "Switching Protocols", response.statusText);
   2215   KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo");
   2216   KJ_ASSERT(response.webSocketOrBody.is<kj::Own<WebSocket>>());
   2217   auto ws = kj::mv(response.webSocketOrBody.get<kj::Own<WebSocket>>());
   2218 
   2219   {
   2220     auto message = ws->receive().wait(waitScope);
   2221     KJ_ASSERT(message.is<kj::String>());
   2222     KJ_EXPECT(message.get<kj::String>() == "start-inline");
   2223   }
   2224 
   2225   ws->send(kj::StringPtr("bar")).wait(waitScope);
   2226   {
   2227     auto message = ws->receive().wait(waitScope);
   2228     KJ_ASSERT(message.is<kj::String>());
   2229     KJ_EXPECT(message.get<kj::String>() == "reply:bar");
   2230   }
   2231 
   2232   ws->close(0x1234, "qux").wait(waitScope);
   2233   {
   2234     auto message = ws->receive().wait(waitScope);
   2235     KJ_ASSERT(message.is<WebSocket::Close>());
   2236     KJ_EXPECT(message.get<WebSocket::Close>().code == 0x1235);
   2237     KJ_EXPECT(message.get<WebSocket::Close>().reason == "close-reply:qux");
   2238   }
   2239 }
   2240 
   2241 inline kj::Promise<void> writeA(kj::AsyncOutputStream& out, kj::ArrayPtr<const byte> data) {
   2242   return out.write(data.begin(), data.size());
   2243 }
   2244 
   2245 KJ_TEST("HttpClient WebSocket handshake") {
   2246   KJ_HTTP_TEST_SETUP_IO;
   2247   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2248 
   2249   auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
   2250 
   2251   auto serverTask = expectRead(*pipe.ends[1], request)
   2252       .then([&]() { return writeA(*pipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)); })
   2253       .then([&]() { return writeA(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE); })
   2254       .then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
   2255       .then([&]() { return writeA(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE); })
   2256       .then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_CLOSE); })
   2257       .then([&]() { return writeA(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE); })
   2258       .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
   2259 
   2260   HttpHeaderTable::Builder tableBuilder;
   2261   HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
   2262   auto headerTable = tableBuilder.build();
   2263 
   2264   FakeEntropySource entropySource;
   2265   HttpClientSettings clientSettings;
   2266   clientSettings.entropySource = entropySource;
   2267 
   2268   auto client = newHttpClient(*headerTable, *pipe.ends[0], clientSettings);
   2269 
   2270   testWebSocketClient(waitScope, *headerTable, hMyHeader, *client);
   2271 
   2272   serverTask.wait(waitScope);
   2273 }
   2274 
   2275 KJ_TEST("HttpClient WebSocket error") {
   2276   KJ_HTTP_TEST_SETUP_IO;
   2277   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2278 
   2279   auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
   2280 
   2281   auto serverTask = expectRead(*pipe.ends[1], request)
   2282       .then([&]() { return writeA(*pipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE_ERROR)); })
   2283       .then([&]() { return expectRead(*pipe.ends[1], request); })
   2284       .then([&]() { return writeA(*pipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE_ERROR)); })
   2285       .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
   2286 
   2287   HttpHeaderTable::Builder tableBuilder;
   2288   HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
   2289   auto headerTable = tableBuilder.build();
   2290 
   2291   FakeEntropySource entropySource;
   2292   HttpClientSettings clientSettings;
   2293   clientSettings.entropySource = entropySource;
   2294 
   2295   auto client = newHttpClient(*headerTable, *pipe.ends[0], clientSettings);
   2296 
   2297   kj::HttpHeaders headers(*headerTable);
   2298   headers.set(hMyHeader, "foo");
   2299 
   2300   {
   2301     auto response = client->openWebSocket("/websocket", headers).wait(waitScope);
   2302 
   2303     KJ_EXPECT(response.statusCode == 404);
   2304     KJ_EXPECT(response.statusText == "Not Found", response.statusText);
   2305     KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo");
   2306     KJ_ASSERT(response.webSocketOrBody.is<kj::Own<AsyncInputStream>>());
   2307   }
   2308 
   2309   {
   2310     auto response = client->openWebSocket("/websocket", headers).wait(waitScope);
   2311 
   2312     KJ_EXPECT(response.statusCode == 404);
   2313     KJ_EXPECT(response.statusText == "Not Found", response.statusText);
   2314     KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo");
   2315     KJ_ASSERT(response.webSocketOrBody.is<kj::Own<AsyncInputStream>>());
   2316   }
   2317 
   2318   serverTask.wait(waitScope);
   2319 }
   2320 
   2321 KJ_TEST("HttpServer WebSocket handshake") {
   2322   KJ_HTTP_TEST_SETUP_IO;
   2323   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2324   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2325 
   2326   HttpHeaderTable::Builder tableBuilder;
   2327   HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
   2328   auto headerTable = tableBuilder.build();
   2329   TestWebSocketService service(*headerTable, hMyHeader);
   2330   HttpServer server(timer, *headerTable, service);
   2331 
   2332   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2333 
   2334   auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
   2335   writeA(*pipe.ends[1], request.asBytes()).wait(waitScope);
   2336   expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);
   2337 
   2338   expectRead(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope);
   2339   writeA(*pipe.ends[1], WEBSOCKET_SEND_MESSAGE).wait(waitScope);
   2340   expectRead(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE).wait(waitScope);
   2341   writeA(*pipe.ends[1], WEBSOCKET_SEND_CLOSE).wait(waitScope);
   2342   expectRead(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE).wait(waitScope);
   2343 
   2344   listenTask.wait(waitScope);
   2345 }
   2346 
   2347 KJ_TEST("HttpServer WebSocket handshake error") {
   2348   KJ_HTTP_TEST_SETUP_IO;
   2349   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2350   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2351 
   2352   HttpHeaderTable::Builder tableBuilder;
   2353   HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
   2354   auto headerTable = tableBuilder.build();
   2355   TestWebSocketService service(*headerTable, hMyHeader);
   2356   HttpServer server(timer, *headerTable, service);
   2357 
   2358   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2359 
   2360   auto request = kj::str("GET /return-error", WEBSOCKET_REQUEST_HANDSHAKE);
   2361   writeA(*pipe.ends[1], request.asBytes()).wait(waitScope);
   2362   expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(waitScope);
   2363 
   2364   // Can send more requests!
   2365   writeA(*pipe.ends[1], request.asBytes()).wait(waitScope);
   2366   expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(waitScope);
   2367 
   2368   pipe.ends[1]->shutdownWrite();
   2369 
   2370   listenTask.wait(waitScope);
   2371 }
   2372 
   2373 // -----------------------------------------------------------------------------
   2374 
   2375 KJ_TEST("HttpServer request timeout") {
   2376   auto PIPELINE_TESTS = pipelineTestCases();
   2377 
   2378   KJ_HTTP_TEST_SETUP_IO;
   2379   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2380   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2381 
   2382   HttpHeaderTable table;
   2383   TestHttpService service(PIPELINE_TESTS, table);
   2384   HttpServerSettings settings;
   2385   settings.headerTimeout = 1 * kj::MILLISECONDS;
   2386   HttpServer server(timer, table, service, settings);
   2387 
   2388   // Shouldn't hang! Should time out.
   2389   auto promise = server.listenHttp(kj::mv(pipe.ends[0]));
   2390   KJ_EXPECT(!promise.poll(waitScope));
   2391   timer.advanceTo(timer.now() + settings.headerTimeout / 2);
   2392   KJ_EXPECT(!promise.poll(waitScope));
   2393   timer.advanceTo(timer.now() + settings.headerTimeout);
   2394   promise.wait(waitScope);
   2395 
   2396   // Closes the connection without sending anything.
   2397   KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
   2398 }
   2399 
   2400 KJ_TEST("HttpServer pipeline timeout") {
   2401   auto PIPELINE_TESTS = pipelineTestCases();
   2402 
   2403   KJ_HTTP_TEST_SETUP_IO;
   2404   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2405   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2406 
   2407   HttpHeaderTable table;
   2408   TestHttpService service(PIPELINE_TESTS, table);
   2409   HttpServerSettings settings;
   2410   settings.pipelineTimeout = 1 * kj::MILLISECONDS;
   2411   HttpServer server(timer, table, service, settings);
   2412 
   2413   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2414 
   2415   // Do one request.
   2416   pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
   2417       .wait(waitScope);
   2418   expectRead(*pipe.ends[1], PIPELINE_TESTS[0].response.raw).wait(waitScope);
   2419 
   2420   // Listen task should time out even though we didn't shutdown the socket.
   2421   KJ_EXPECT(!listenTask.poll(waitScope));
   2422   timer.advanceTo(timer.now() + settings.pipelineTimeout / 2);
   2423   KJ_EXPECT(!listenTask.poll(waitScope));
   2424   timer.advanceTo(timer.now() + settings.pipelineTimeout);
   2425   listenTask.wait(waitScope);
   2426 
   2427   // In this case, no data is sent back.
   2428   KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
   2429 }
   2430 
   2431 class BrokenHttpService final: public HttpService {
   2432   // HttpService that doesn't send a response.
   2433 public:
   2434   BrokenHttpService() = default;
   2435   explicit BrokenHttpService(kj::Exception&& exception): exception(kj::mv(exception)) {}
   2436 
   2437   kj::Promise<void> request(
   2438       HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
   2439       kj::AsyncInputStream& requestBody, Response& responseSender) override {
   2440     return requestBody.readAllBytes().then([this](kj::Array<byte>&&) -> kj::Promise<void> {
   2441       KJ_IF_MAYBE(e, exception) {
   2442         return kj::cp(*e);
   2443       } else {
   2444         return kj::READY_NOW;
   2445       }
   2446     });
   2447   }
   2448 
   2449 private:
   2450   kj::Maybe<kj::Exception> exception;
   2451 };
   2452 
   2453 KJ_TEST("HttpServer no response") {
   2454   auto PIPELINE_TESTS = pipelineTestCases();
   2455 
   2456   KJ_HTTP_TEST_SETUP_IO;
   2457   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2458   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2459 
   2460   HttpHeaderTable table;
   2461   BrokenHttpService service;
   2462   HttpServer server(timer, table, service);
   2463 
   2464   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2465 
   2466   // Do one request.
   2467   pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
   2468       .wait(waitScope);
   2469   auto text = pipe.ends[1]->readAllText().wait(waitScope);
   2470 
   2471   KJ_EXPECT(text ==
   2472       "HTTP/1.1 500 Internal Server Error\r\n"
   2473       "Connection: close\r\n"
   2474       "Content-Length: 51\r\n"
   2475       "Content-Type: text/plain\r\n"
   2476       "\r\n"
   2477       "ERROR: The HttpService did not generate a response.", text);
   2478 }
   2479 
   2480 KJ_TEST("HttpServer disconnected") {
   2481   auto PIPELINE_TESTS = pipelineTestCases();
   2482 
   2483   KJ_HTTP_TEST_SETUP_IO;
   2484   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2485   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2486 
   2487   HttpHeaderTable table;
   2488   BrokenHttpService service(KJ_EXCEPTION(DISCONNECTED, "disconnected"));
   2489   HttpServer server(timer, table, service);
   2490 
   2491   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2492 
   2493   // Do one request.
   2494   pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
   2495       .wait(waitScope);
   2496   auto text = pipe.ends[1]->readAllText().wait(waitScope);
   2497 
   2498   KJ_EXPECT(text == "", text);
   2499 }
   2500 
   2501 KJ_TEST("HttpServer overloaded") {
   2502   auto PIPELINE_TESTS = pipelineTestCases();
   2503 
   2504   KJ_HTTP_TEST_SETUP_IO;
   2505   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2506   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2507 
   2508   HttpHeaderTable table;
   2509   BrokenHttpService service(KJ_EXCEPTION(OVERLOADED, "overloaded"));
   2510   HttpServer server(timer, table, service);
   2511 
   2512   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2513 
   2514   // Do one request.
   2515   pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
   2516       .wait(waitScope);
   2517   auto text = pipe.ends[1]->readAllText().wait(waitScope);
   2518 
   2519   KJ_EXPECT(text.startsWith("HTTP/1.1 503 Service Unavailable"), text);
   2520 }
   2521 
   2522 KJ_TEST("HttpServer unimplemented") {
   2523   auto PIPELINE_TESTS = pipelineTestCases();
   2524 
   2525   KJ_HTTP_TEST_SETUP_IO;
   2526   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2527   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2528 
   2529   HttpHeaderTable table;
   2530   BrokenHttpService service(KJ_EXCEPTION(UNIMPLEMENTED, "unimplemented"));
   2531   HttpServer server(timer, table, service);
   2532 
   2533   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2534 
   2535   // Do one request.
   2536   pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
   2537       .wait(waitScope);
   2538   auto text = pipe.ends[1]->readAllText().wait(waitScope);
   2539 
   2540   KJ_EXPECT(text.startsWith("HTTP/1.1 501 Not Implemented"), text);
   2541 }
   2542 
   2543 KJ_TEST("HttpServer threw exception") {
   2544   auto PIPELINE_TESTS = pipelineTestCases();
   2545 
   2546   KJ_HTTP_TEST_SETUP_IO;
   2547   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2548   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2549 
   2550   HttpHeaderTable table;
   2551   BrokenHttpService service(KJ_EXCEPTION(FAILED, "failed"));
   2552   HttpServer server(timer, table, service);
   2553 
   2554   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2555 
   2556   // Do one request.
   2557   pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
   2558       .wait(waitScope);
   2559   auto text = pipe.ends[1]->readAllText().wait(waitScope);
   2560 
   2561   KJ_EXPECT(text.startsWith("HTTP/1.1 500 Internal Server Error"), text);
   2562 }
   2563 
   2564 KJ_TEST("HttpServer bad request") {
   2565   KJ_HTTP_TEST_SETUP_IO;
   2566   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2567   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2568 
   2569   HttpHeaderTable table;
   2570   BrokenHttpService service;
   2571   HttpServer server(timer, table, service);
   2572 
   2573   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2574 
   2575   static constexpr auto request = "GET / HTTP/1.1\r\nbad request\r\n\r\n"_kj;
   2576   auto writePromise = pipe.ends[1]->write(request.begin(), request.size());
   2577   auto response = pipe.ends[1]->readAllText().wait(waitScope);
   2578   KJ_EXPECT(writePromise.poll(waitScope));
   2579   writePromise.wait(waitScope);
   2580 
   2581   static constexpr auto expectedResponse =
   2582       "HTTP/1.1 400 Bad Request\r\n"
   2583       "Connection: close\r\n"
   2584       "Content-Length: 53\r\n"
   2585       "Content-Type: text/plain\r\n"
   2586       "\r\n"
   2587       "ERROR: The headers sent by your client are not valid."_kj;
   2588 
   2589   KJ_EXPECT(expectedResponse == response, expectedResponse, response);
   2590 }
   2591 
   2592 KJ_TEST("HttpServer invalid method") {
   2593   KJ_HTTP_TEST_SETUP_IO;
   2594   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2595   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2596 
   2597   HttpHeaderTable table;
   2598   BrokenHttpService service;
   2599   HttpServer server(timer, table, service);
   2600 
   2601   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2602 
   2603   static constexpr auto request = "bad request\r\n\r\n"_kj;
   2604   auto writePromise = pipe.ends[1]->write(request.begin(), request.size());
   2605   auto response = pipe.ends[1]->readAllText().wait(waitScope);
   2606   KJ_EXPECT(writePromise.poll(waitScope));
   2607   writePromise.wait(waitScope);
   2608 
   2609   static constexpr auto expectedResponse =
   2610       "HTTP/1.1 501 Not Implemented\r\n"
   2611       "Connection: close\r\n"
   2612       "Content-Length: 35\r\n"
   2613       "Content-Type: text/plain\r\n"
   2614       "\r\n"
   2615       "ERROR: Unrecognized request method."_kj;
   2616 
   2617   KJ_EXPECT(expectedResponse == response, expectedResponse, response);
   2618 }
   2619 
   2620 // Ensure that HttpServerSettings can continue to be constexpr.
   2621 KJ_UNUSED static constexpr HttpServerSettings STATIC_CONSTEXPR_SETTINGS {};
   2622 
   2623 class TestErrorHandler: public HttpServerErrorHandler {
   2624 public:
   2625   kj::Promise<void> handleClientProtocolError(
   2626       HttpHeaders::ProtocolError protocolError, kj::HttpService::Response& response) override {
   2627     // In a real error handler, you should redact `protocolError.rawContent`.
   2628     auto message = kj::str("Saw protocol error: ", protocolError.description, "; rawContent = ",
   2629         encodeCEscape(protocolError.rawContent));
   2630     return sendError(400, "Bad Request", kj::mv(message), response);
   2631   }
   2632 
   2633   kj::Promise<void> handleApplicationError(
   2634       kj::Exception exception, kj::Maybe<kj::HttpService::Response&> response) override {
   2635     return sendError(500, "Internal Server Error",
   2636         kj::str("Saw application error: ", exception.getDescription()), response);
   2637   }
   2638 
   2639   kj::Promise<void> handleNoResponse(kj::HttpService::Response& response) override {
   2640     return sendError(500, "Internal Server Error", kj::str("Saw no response."), response);
   2641   }
   2642 
   2643   static TestErrorHandler instance;
   2644 
   2645 private:
   2646   kj::Promise<void> sendError(uint statusCode, kj::StringPtr statusText, String message,
   2647       Maybe<HttpService::Response&> response) {
   2648     KJ_IF_MAYBE(r, response) {
   2649       HttpHeaderTable headerTable;
   2650       HttpHeaders headers(headerTable);
   2651       auto body = r->send(statusCode, statusText, headers, message.size());
   2652       return body->write(message.begin(), message.size()).attach(kj::mv(body), kj::mv(message));
   2653     } else {
   2654       KJ_LOG(ERROR, "Saw an error but too late to report to client.");
   2655       return kj::READY_NOW;
   2656     }
   2657   }
   2658 };
   2659 
   2660 TestErrorHandler TestErrorHandler::instance {};
   2661 
   2662 KJ_TEST("HttpServer no response, custom error handler") {
   2663   auto PIPELINE_TESTS = pipelineTestCases();
   2664 
   2665   KJ_HTTP_TEST_SETUP_IO;
   2666   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2667   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2668 
   2669   HttpServerSettings settings {};
   2670   settings.errorHandler = TestErrorHandler::instance;
   2671 
   2672   HttpHeaderTable table;
   2673   BrokenHttpService service;
   2674   HttpServer server(timer, table, service, settings);
   2675 
   2676   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2677 
   2678   // Do one request.
   2679   pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
   2680       .wait(waitScope);
   2681   auto text = pipe.ends[1]->readAllText().wait(waitScope);
   2682 
   2683   KJ_EXPECT(text ==
   2684       "HTTP/1.1 500 Internal Server Error\r\n"
   2685       "Connection: close\r\n"
   2686       "Content-Length: 16\r\n"
   2687       "\r\n"
   2688       "Saw no response.", text);
   2689 }
   2690 
   2691 KJ_TEST("HttpServer threw exception, custom error handler") {
   2692   auto PIPELINE_TESTS = pipelineTestCases();
   2693 
   2694   KJ_HTTP_TEST_SETUP_IO;
   2695   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2696   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2697 
   2698   HttpServerSettings settings {};
   2699   settings.errorHandler = TestErrorHandler::instance;
   2700 
   2701   HttpHeaderTable table;
   2702   BrokenHttpService service(KJ_EXCEPTION(FAILED, "failed"));
   2703   HttpServer server(timer, table, service, settings);
   2704 
   2705   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2706 
   2707   // Do one request.
   2708   pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
   2709       .wait(waitScope);
   2710   auto text = pipe.ends[1]->readAllText().wait(waitScope);
   2711 
   2712   KJ_EXPECT(text ==
   2713       "HTTP/1.1 500 Internal Server Error\r\n"
   2714       "Connection: close\r\n"
   2715       "Content-Length: 29\r\n"
   2716       "\r\n"
   2717       "Saw application error: failed", text);
   2718 }
   2719 
   2720 KJ_TEST("HttpServer bad request, custom error handler") {
   2721   KJ_HTTP_TEST_SETUP_IO;
   2722   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2723   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2724 
   2725   HttpServerSettings settings {};
   2726   settings.errorHandler = TestErrorHandler::instance;
   2727 
   2728   HttpHeaderTable table;
   2729   BrokenHttpService service;
   2730   HttpServer server(timer, table, service, settings);
   2731 
   2732   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2733 
   2734   static constexpr auto request = "bad request\r\n\r\n"_kj;
   2735   auto writePromise = pipe.ends[1]->write(request.begin(), request.size());
   2736   auto response = pipe.ends[1]->readAllText().wait(waitScope);
   2737   KJ_EXPECT(writePromise.poll(waitScope));
   2738   writePromise.wait(waitScope);
   2739 
   2740   static constexpr auto expectedResponse =
   2741       "HTTP/1.1 400 Bad Request\r\n"
   2742       "Connection: close\r\n"
   2743       "Content-Length: 80\r\n"
   2744       "\r\n"
   2745       "Saw protocol error: Unrecognized request method.; "
   2746       "rawContent = bad request\\000\\n"_kj;
   2747 
   2748   KJ_EXPECT(expectedResponse == response, expectedResponse, response);
   2749 }
   2750 
   2751 class PartialResponseService final: public HttpService {
   2752   // HttpService that sends a partial response then throws.
   2753 public:
   2754   kj::Promise<void> request(
   2755       HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
   2756       kj::AsyncInputStream& requestBody, Response& response) override {
   2757     return requestBody.readAllBytes()
   2758         .then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> {
   2759       HttpHeaders headers(table);
   2760       auto body = response.send(200, "OK", headers, 32);
   2761       auto promise = body->write("foo", 3);
   2762       return promise.attach(kj::mv(body)).then([]() -> kj::Promise<void> {
   2763         return KJ_EXCEPTION(FAILED, "failed");
   2764       });
   2765     });
   2766   }
   2767 
   2768 private:
   2769   kj::Maybe<kj::Exception> exception;
   2770   HttpHeaderTable table;
   2771 };
   2772 
   2773 KJ_TEST("HttpServer threw exception after starting response") {
   2774   auto PIPELINE_TESTS = pipelineTestCases();
   2775 
   2776   KJ_HTTP_TEST_SETUP_IO;
   2777   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2778   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2779 
   2780   HttpHeaderTable table;
   2781   PartialResponseService service;
   2782   HttpServer server(timer, table, service);
   2783 
   2784   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2785 
   2786   KJ_EXPECT_LOG(ERROR, "HttpService threw exception after generating a partial response");
   2787 
   2788   // Do one request.
   2789   pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
   2790       .wait(waitScope);
   2791   auto text = pipe.ends[1]->readAllText().wait(waitScope);
   2792 
   2793   KJ_EXPECT(text ==
   2794       "HTTP/1.1 200 OK\r\n"
   2795       "Content-Length: 32\r\n"
   2796       "\r\n"
   2797       "foo", text);
   2798 }
   2799 
   2800 class PartialResponseNoThrowService final: public HttpService {
   2801   // HttpService that sends a partial response then returns without throwing.
   2802 public:
   2803   kj::Promise<void> request(
   2804       HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
   2805       kj::AsyncInputStream& requestBody, Response& response) override {
   2806     return requestBody.readAllBytes()
   2807         .then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> {
   2808       HttpHeaders headers(table);
   2809       auto body = response.send(200, "OK", headers, 32);
   2810       auto promise = body->write("foo", 3);
   2811       return promise.attach(kj::mv(body));
   2812     });
   2813   }
   2814 
   2815 private:
   2816   kj::Maybe<kj::Exception> exception;
   2817   HttpHeaderTable table;
   2818 };
   2819 
   2820 KJ_TEST("HttpServer failed to write complete response but didn't throw") {
   2821   auto PIPELINE_TESTS = pipelineTestCases();
   2822 
   2823   KJ_HTTP_TEST_SETUP_IO;
   2824   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2825   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2826 
   2827   HttpHeaderTable table;
   2828   PartialResponseNoThrowService service;
   2829   HttpServer server(timer, table, service);
   2830 
   2831   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2832 
   2833   // Do one request.
   2834   pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
   2835       .wait(waitScope);
   2836   auto text = pipe.ends[1]->readAllText().wait(waitScope);
   2837 
   2838   KJ_EXPECT(text ==
   2839       "HTTP/1.1 200 OK\r\n"
   2840       "Content-Length: 32\r\n"
   2841       "\r\n"
   2842       "foo", text);
   2843 }
   2844 
   2845 class SimpleInputStream final: public kj::AsyncInputStream {
   2846   // An InputStream that returns bytes out of a static string.
   2847 
   2848 public:
   2849   SimpleInputStream(kj::StringPtr text)
   2850       : unread(text.asBytes()) {}
   2851 
   2852   kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
   2853     size_t amount = kj::min(maxBytes, unread.size());
   2854     memcpy(buffer, unread.begin(), amount);
   2855     unread = unread.slice(amount, unread.size());
   2856     return amount;
   2857   }
   2858 
   2859 private:
   2860   kj::ArrayPtr<const byte> unread;
   2861 };
   2862 
   2863 class PumpResponseService final: public HttpService {
   2864   // HttpService that uses pumpTo() to write a response, without carefully specifying how much to
   2865   // pump, but the stream happens to be the right size.
   2866 public:
   2867   kj::Promise<void> request(
   2868       HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
   2869       kj::AsyncInputStream& requestBody, Response& response) override {
   2870     return requestBody.readAllBytes()
   2871         .then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> {
   2872       HttpHeaders headers(table);
   2873       kj::StringPtr text = "Hello, World!";
   2874       auto body = response.send(200, "OK", headers, text.size());
   2875 
   2876       auto stream = kj::heap<SimpleInputStream>(text);
   2877       auto promise = stream->pumpTo(*body);
   2878       return promise.attach(kj::mv(body), kj::mv(stream))
   2879           .then([text](uint64_t amount) {
   2880         KJ_EXPECT(amount == text.size());
   2881       });
   2882     });
   2883   }
   2884 
   2885 private:
   2886   kj::Maybe<kj::Exception> exception;
   2887   HttpHeaderTable table;
   2888 };
   2889 
   2890 KJ_TEST("HttpFixedLengthEntityWriter correctly implements tryPumpFrom") {
   2891   auto PIPELINE_TESTS = pipelineTestCases();
   2892 
   2893   KJ_HTTP_TEST_SETUP_IO;
   2894   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2895   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2896 
   2897   HttpHeaderTable table;
   2898   PumpResponseService service;
   2899   HttpServer server(timer, table, service);
   2900 
   2901   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2902 
   2903   // Do one request.
   2904   pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
   2905       .wait(waitScope);
   2906   pipe.ends[1]->shutdownWrite();
   2907   auto text = pipe.ends[1]->readAllText().wait(waitScope);
   2908 
   2909   KJ_EXPECT(text ==
   2910       "HTTP/1.1 200 OK\r\n"
   2911       "Content-Length: 13\r\n"
   2912       "\r\n"
   2913       "Hello, World!", text);
   2914 }
   2915 
   2916 class HangingHttpService final: public HttpService {
   2917   // HttpService that hangs forever.
   2918 public:
   2919   kj::Promise<void> request(
   2920       HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
   2921       kj::AsyncInputStream& requestBody, Response& responseSender) override {
   2922     kj::Promise<void> result = kj::NEVER_DONE;
   2923     ++inFlight;
   2924     return result.attach(kj::defer([this]() {
   2925       if (--inFlight == 0) {
   2926         KJ_IF_MAYBE(f, onCancelFulfiller) {
   2927           f->get()->fulfill();
   2928         }
   2929       }
   2930     }));
   2931   }
   2932 
   2933   kj::Promise<void> onCancel() {
   2934     auto paf = kj::newPromiseAndFulfiller<void>();
   2935     onCancelFulfiller = kj::mv(paf.fulfiller);
   2936     return kj::mv(paf.promise);
   2937   }
   2938 
   2939   uint inFlight = 0;
   2940 
   2941 private:
   2942   kj::Maybe<kj::Exception> exception;
   2943   kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> onCancelFulfiller;
   2944 };
   2945 
   2946 KJ_TEST("HttpServer cancels request when client disconnects") {
   2947   KJ_HTTP_TEST_SETUP_IO;
   2948   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2949   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2950 
   2951   HttpHeaderTable table;
   2952   HangingHttpService service;
   2953   HttpServer server(timer, table, service);
   2954 
   2955   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   2956 
   2957   KJ_EXPECT(service.inFlight == 0);
   2958 
   2959   static constexpr auto request = "GET / HTTP/1.1\r\n\r\n"_kj;
   2960   pipe.ends[1]->write(request.begin(), request.size()).wait(waitScope);
   2961 
   2962   auto cancelPromise = service.onCancel();
   2963   KJ_EXPECT(!cancelPromise.poll(waitScope));
   2964   KJ_EXPECT(service.inFlight == 1);
   2965 
   2966   // Disconnect client and verify server cancels.
   2967   pipe.ends[1] = nullptr;
   2968   KJ_ASSERT(cancelPromise.poll(waitScope));
   2969   KJ_EXPECT(service.inFlight == 0);
   2970   cancelPromise.wait(waitScope);
   2971 }
   2972 
   2973 // -----------------------------------------------------------------------------
   2974 
   2975 KJ_TEST("newHttpService from HttpClient") {
   2976   auto PIPELINE_TESTS = pipelineTestCases();
   2977 
   2978   KJ_HTTP_TEST_SETUP_IO;
   2979   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   2980   auto frontPipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2981   auto backPipe = KJ_HTTP_TEST_CREATE_2PIPE;
   2982 
   2983   kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
   2984   for (auto& testCase: PIPELINE_TESTS) {
   2985     writeResponsesPromise = writeResponsesPromise
   2986         .then([&]() {
   2987       return expectRead(*backPipe.ends[1], testCase.request.raw);
   2988     }).then([&]() {
   2989       return backPipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size());
   2990     });
   2991   }
   2992 
   2993   {
   2994     HttpHeaderTable table;
   2995     auto backClient = newHttpClient(table, *backPipe.ends[0]);
   2996     auto frontService = newHttpService(*backClient);
   2997     HttpServer frontServer(timer, table, *frontService);
   2998     auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));
   2999 
   3000     for (auto& testCase: PIPELINE_TESTS) {
   3001       KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
   3002 
   3003       frontPipe.ends[0]->write(testCase.request.raw.begin(), testCase.request.raw.size())
   3004                .wait(waitScope);
   3005 
   3006       expectRead(*frontPipe.ends[0], testCase.response.raw).wait(waitScope);
   3007     }
   3008 
   3009     frontPipe.ends[0]->shutdownWrite();
   3010     listenTask.wait(waitScope);
   3011   }
   3012 
   3013   backPipe.ends[0]->shutdownWrite();
   3014   writeResponsesPromise.wait(waitScope);
   3015 }
   3016 
   3017 KJ_TEST("newHttpService from HttpClient WebSockets") {
   3018   KJ_HTTP_TEST_SETUP_IO;
   3019   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   3020   auto frontPipe = KJ_HTTP_TEST_CREATE_2PIPE;
   3021   auto backPipe = KJ_HTTP_TEST_CREATE_2PIPE;
   3022 
   3023   auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
   3024   auto writeResponsesPromise = expectRead(*backPipe.ends[1], request)
   3025       .then([&]() { return writeA(*backPipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)); })
   3026       .then([&]() { return writeA(*backPipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE); })
   3027       .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
   3028       .then([&]() { return writeA(*backPipe.ends[1], WEBSOCKET_REPLY_MESSAGE); })
   3029       .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_CLOSE); })
   3030       .then([&]() { return writeA(*backPipe.ends[1], WEBSOCKET_REPLY_CLOSE); })
   3031       .then([&]() { return expectEnd(*backPipe.ends[1]); })
   3032       .then([&]() { backPipe.ends[1]->shutdownWrite(); })
   3033       .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
   3034 
   3035   {
   3036     HttpHeaderTable table;
   3037     FakeEntropySource entropySource;
   3038     HttpClientSettings clientSettings;
   3039     clientSettings.entropySource = entropySource;
   3040     auto backClientStream = kj::mv(backPipe.ends[0]);
   3041     auto backClient = newHttpClient(table, *backClientStream, clientSettings);
   3042     auto frontService = newHttpService(*backClient);
   3043     HttpServer frontServer(timer, table, *frontService);
   3044     auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));
   3045 
   3046     writeA(*frontPipe.ends[0], request.asBytes()).wait(waitScope);
   3047     expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);
   3048 
   3049     expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope);
   3050     writeA(*frontPipe.ends[0], WEBSOCKET_SEND_MESSAGE).wait(waitScope);
   3051     expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_MESSAGE).wait(waitScope);
   3052     writeA(*frontPipe.ends[0], WEBSOCKET_SEND_CLOSE).wait(waitScope);
   3053     expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_CLOSE).wait(waitScope);
   3054 
   3055     frontPipe.ends[0]->shutdownWrite();
   3056     listenTask.wait(waitScope);
   3057   }
   3058 
   3059   writeResponsesPromise.wait(waitScope);
   3060 }
   3061 
   3062 KJ_TEST("newHttpService from HttpClient WebSockets disconnect") {
   3063   KJ_HTTP_TEST_SETUP_IO;
   3064   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   3065   auto frontPipe = KJ_HTTP_TEST_CREATE_2PIPE;
   3066   auto backPipe = KJ_HTTP_TEST_CREATE_2PIPE;
   3067 
   3068   auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
   3069   auto writeResponsesPromise = expectRead(*backPipe.ends[1], request)
   3070       .then([&]() { return writeA(*backPipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)); })
   3071       .then([&]() { return writeA(*backPipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE); })
   3072       .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
   3073       .then([&]() { backPipe.ends[1]->shutdownWrite(); })
   3074       .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
   3075 
   3076   {
   3077     HttpHeaderTable table;
   3078     FakeEntropySource entropySource;
   3079     HttpClientSettings clientSettings;
   3080     clientSettings.entropySource = entropySource;
   3081     auto backClient = newHttpClient(table, *backPipe.ends[0], clientSettings);
   3082     auto frontService = newHttpService(*backClient);
   3083     HttpServer frontServer(timer, table, *frontService);
   3084     auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));
   3085 
   3086     writeA(*frontPipe.ends[0], request.asBytes()).wait(waitScope);
   3087     expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);
   3088 
   3089     expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope);
   3090     writeA(*frontPipe.ends[0], WEBSOCKET_SEND_MESSAGE).wait(waitScope);
   3091 
   3092     KJ_EXPECT(frontPipe.ends[0]->readAllText().wait(waitScope) == "");
   3093 
   3094     frontPipe.ends[0]->shutdownWrite();
   3095     listenTask.wait(waitScope);
   3096   }
   3097 
   3098   writeResponsesPromise.wait(waitScope);
   3099 }
   3100 
   3101 // -----------------------------------------------------------------------------
   3102 
   3103 KJ_TEST("newHttpClient from HttpService") {
   3104   auto PIPELINE_TESTS = pipelineTestCases();
   3105 
   3106   KJ_HTTP_TEST_SETUP_IO;
   3107   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   3108 
   3109   HttpHeaderTable table;
   3110   TestHttpService service(PIPELINE_TESTS, table);
   3111   auto client = newHttpClient(service);
   3112 
   3113   for (auto& testCase: PIPELINE_TESTS) {
   3114     testHttpClient(waitScope, table, *client, testCase);
   3115   }
   3116 }
   3117 
   3118 KJ_TEST("newHttpClient from HttpService WebSockets") {
   3119   KJ_HTTP_TEST_SETUP_IO;
   3120   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   3121   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   3122 
   3123   HttpHeaderTable::Builder tableBuilder;
   3124   HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
   3125   auto headerTable = tableBuilder.build();
   3126   TestWebSocketService service(*headerTable, hMyHeader);
   3127   auto client = newHttpClient(service);
   3128 
   3129   testWebSocketClient(waitScope, *headerTable, hMyHeader, *client);
   3130 }
   3131 
   3132 KJ_TEST("adapted client/server propagates request exceptions like non-adapted client") {
   3133   KJ_HTTP_TEST_SETUP_IO;
   3134 
   3135   HttpHeaderTable table;
   3136   HttpHeaders headers(table);
   3137 
   3138   class FailingHttpClient final: public HttpClient {
   3139   public:
   3140     Request request(
   3141         HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
   3142         kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
   3143       KJ_FAIL_ASSERT("request_fail");
   3144     }
   3145 
   3146     kj::Promise<WebSocketResponse> openWebSocket(
   3147         kj::StringPtr url, const HttpHeaders& headers) override {
   3148       KJ_FAIL_ASSERT("websocket_fail");
   3149     }
   3150   };
   3151 
   3152   auto rawClient = kj::heap<FailingHttpClient>();
   3153 
   3154   auto innerClient = kj::heap<FailingHttpClient>();
   3155   auto adaptedService = kj::newHttpService(*innerClient).attach(kj::mv(innerClient));
   3156   auto adaptedClient = kj::newHttpClient(*adaptedService).attach(kj::mv(adaptedService));
   3157 
   3158   KJ_EXPECT_THROW_MESSAGE("request_fail", rawClient->request(HttpMethod::POST, "/"_kj, headers));
   3159   KJ_EXPECT_THROW_MESSAGE("request_fail", adaptedClient->request(HttpMethod::POST, "/"_kj, headers));
   3160 
   3161   KJ_EXPECT_THROW_MESSAGE("websocket_fail", rawClient->openWebSocket("/"_kj, headers));
   3162   KJ_EXPECT_THROW_MESSAGE("websocket_fail", adaptedClient->openWebSocket("/"_kj, headers));
   3163 }
   3164 
   3165 class DelayedCompletionHttpService final: public HttpService {
   3166 public:
   3167   DelayedCompletionHttpService(HttpHeaderTable& table, kj::Maybe<uint64_t> expectedLength)
   3168       : table(table), expectedLength(expectedLength) {}
   3169 
   3170   kj::Promise<void> request(
   3171       HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
   3172       kj::AsyncInputStream& requestBody, Response& response) override {
   3173     auto stream = response.send(200, "OK", HttpHeaders(table), expectedLength);
   3174     auto promise = stream->write("foo", 3);
   3175     return promise.attach(kj::mv(stream)).then([this]() {
   3176       return kj::mv(paf.promise);
   3177     });
   3178   }
   3179 
   3180   kj::PromiseFulfiller<void>& getFulfiller() { return *paf.fulfiller; }
   3181 
   3182 private:
   3183   HttpHeaderTable& table;
   3184   kj::Maybe<uint64_t> expectedLength;
   3185   kj::PromiseFulfillerPair<void> paf = kj::newPromiseAndFulfiller<void>();
   3186 };
   3187 
   3188 void doDelayedCompletionTest(bool exception, kj::Maybe<uint64_t> expectedLength) noexcept {
   3189   KJ_HTTP_TEST_SETUP_IO;
   3190 
   3191   HttpHeaderTable table;
   3192 
   3193   DelayedCompletionHttpService service(table, expectedLength);
   3194   auto client = newHttpClient(service);
   3195 
   3196   auto resp = client->request(HttpMethod::GET, "/", HttpHeaders(table), uint64_t(0))
   3197       .response.wait(waitScope);
   3198   KJ_EXPECT(resp.statusCode == 200);
   3199 
   3200   // Read "foo" from the response body: works
   3201   char buffer[16];
   3202   KJ_ASSERT(resp.body->tryRead(buffer, 1, sizeof(buffer)).wait(waitScope) == 3);
   3203   buffer[3] = '\0';
   3204   KJ_EXPECT(buffer == "foo"_kj);
   3205 
   3206   // But reading any more hangs.
   3207   auto promise = resp.body->tryRead(buffer, 1, sizeof(buffer));
   3208 
   3209   KJ_EXPECT(!promise.poll(waitScope));
   3210 
   3211   // Until we cause the service to return.
   3212   if (exception) {
   3213     service.getFulfiller().reject(KJ_EXCEPTION(FAILED, "service-side failure"));
   3214   } else {
   3215     service.getFulfiller().fulfill();
   3216   }
   3217 
   3218   KJ_ASSERT(promise.poll(waitScope));
   3219 
   3220   if (exception) {
   3221     KJ_EXPECT_THROW_MESSAGE("service-side failure", promise.wait(waitScope));
   3222   } else {
   3223     promise.wait(waitScope);
   3224   }
   3225 };
   3226 
   3227 KJ_TEST("adapted client waits for service to complete before returning EOF on response stream") {
   3228   doDelayedCompletionTest(false, uint64_t(3));
   3229 }
   3230 
   3231 KJ_TEST("adapted client waits for service to complete before returning EOF on chunked response") {
   3232   doDelayedCompletionTest(false, nullptr);
   3233 }
   3234 
   3235 KJ_TEST("adapted client propagates throw from service after complete response body sent") {
   3236   doDelayedCompletionTest(true, uint64_t(3));
   3237 }
   3238 
   3239 KJ_TEST("adapted client propagates throw from service after incomplete response body sent") {
   3240   doDelayedCompletionTest(true, uint64_t(6));
   3241 }
   3242 
   3243 KJ_TEST("adapted client propagates throw from service after chunked response body sent") {
   3244   doDelayedCompletionTest(true, nullptr);
   3245 }
   3246 
   3247 class DelayedCompletionWebSocketHttpService final: public HttpService {
   3248 public:
   3249   DelayedCompletionWebSocketHttpService(HttpHeaderTable& table, bool closeUpstreamFirst)
   3250       : table(table), closeUpstreamFirst(closeUpstreamFirst) {}
   3251 
   3252   kj::Promise<void> request(
   3253       HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
   3254       kj::AsyncInputStream& requestBody, Response& response) override {
   3255     KJ_ASSERT(headers.isWebSocket());
   3256 
   3257     auto ws = response.acceptWebSocket(HttpHeaders(table));
   3258     kj::Promise<void> promise = kj::READY_NOW;
   3259     if (closeUpstreamFirst) {
   3260       // Wait for a close message from the client before starting.
   3261       promise = promise.then([&ws = *ws]() { return ws.receive(); }).ignoreResult();
   3262     }
   3263     promise = promise
   3264         .then([&ws = *ws]() { return ws.send("foo"_kj); })
   3265         .then([&ws = *ws]() { return ws.close(1234, "closed"_kj); });
   3266     if (!closeUpstreamFirst) {
   3267       // Wait for a close message from the client at the end.
   3268       promise = promise.then([&ws = *ws]() { return ws.receive(); }).ignoreResult();
   3269     }
   3270     return promise.attach(kj::mv(ws)).then([this]() {
   3271       return kj::mv(paf.promise);
   3272     });
   3273   }
   3274 
   3275   kj::PromiseFulfiller<void>& getFulfiller() { return *paf.fulfiller; }
   3276 
   3277 private:
   3278   HttpHeaderTable& table;
   3279   bool closeUpstreamFirst;
   3280   kj::PromiseFulfillerPair<void> paf = kj::newPromiseAndFulfiller<void>();
   3281 };
   3282 
   3283 void doDelayedCompletionWebSocketTest(bool exception, bool closeUpstreamFirst) noexcept {
   3284   KJ_HTTP_TEST_SETUP_IO;
   3285 
   3286   HttpHeaderTable table;
   3287 
   3288   DelayedCompletionWebSocketHttpService service(table, closeUpstreamFirst);
   3289   auto client = newHttpClient(service);
   3290 
   3291   auto resp = client->openWebSocket("/", HttpHeaders(table)).wait(waitScope);
   3292   auto ws = kj::mv(KJ_ASSERT_NONNULL(resp.webSocketOrBody.tryGet<kj::Own<WebSocket>>()));
   3293 
   3294   if (closeUpstreamFirst) {
   3295     // Send "close" immediately.
   3296     ws->close(1234, "whatever"_kj).wait(waitScope);
   3297   }
   3298 
   3299   // Read "foo" from the WebSocket: works
   3300   {
   3301     auto msg = ws->receive().wait(waitScope);
   3302     KJ_ASSERT(msg.is<kj::String>());
   3303     KJ_ASSERT(msg.get<kj::String>() == "foo");
   3304   }
   3305 
   3306   kj::Promise<void> promise = nullptr;
   3307   if (closeUpstreamFirst) {
   3308     // Receiving the close hangs.
   3309     promise = ws->receive()
   3310         .then([](WebSocket::Message&& msg) { KJ_EXPECT(msg.is<WebSocket::Close>()); });
   3311   } else {
   3312     auto msg = ws->receive().wait(waitScope);
   3313     KJ_ASSERT(msg.is<WebSocket::Close>());
   3314 
   3315     // Sending a close hangs.
   3316     promise = ws->close(1234, "whatever"_kj);
   3317   }
   3318   KJ_EXPECT(!promise.poll(waitScope));
   3319 
   3320   // Until we cause the service to return.
   3321   if (exception) {
   3322     service.getFulfiller().reject(KJ_EXCEPTION(FAILED, "service-side failure"));
   3323   } else {
   3324     service.getFulfiller().fulfill();
   3325   }
   3326 
   3327   KJ_ASSERT(promise.poll(waitScope));
   3328 
   3329   if (exception) {
   3330     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("service-side failure", promise.wait(waitScope));
   3331   } else {
   3332     promise.wait(waitScope);
   3333   }
   3334 };
   3335 
   3336 KJ_TEST("adapted client waits for service to complete before completing upstream close on WebSocket") {
   3337   doDelayedCompletionWebSocketTest(false, false);
   3338 }
   3339 
   3340 KJ_TEST("adapted client waits for service to complete before returning downstream close on WebSocket") {
   3341   doDelayedCompletionWebSocketTest(false, true);
   3342 }
   3343 
   3344 KJ_TEST("adapted client propagates throw from service after WebSocket upstream close sent") {
   3345   doDelayedCompletionWebSocketTest(true, false);
   3346 }
   3347 
   3348 KJ_TEST("adapted client propagates throw from service after WebSocket downstream close sent") {
   3349   doDelayedCompletionWebSocketTest(true, true);
   3350 }
   3351 
   3352 // -----------------------------------------------------------------------------
   3353 
   3354 class CountingIoStream final: public kj::AsyncIoStream {
   3355   // An AsyncIoStream wrapper which decrements a counter when destroyed (allowing us to count how
   3356   // many connections are open).
   3357 
   3358 public:
   3359   CountingIoStream(kj::Own<kj::AsyncIoStream> inner, uint& count)
   3360       : inner(kj::mv(inner)), count(count) {}
   3361   ~CountingIoStream() noexcept(false) {
   3362     --count;
   3363   }
   3364 
   3365   kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
   3366     return inner->read(buffer, minBytes, maxBytes);
   3367   }
   3368   kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
   3369     return inner->tryRead(buffer, minBytes, maxBytes);
   3370   }
   3371   kj::Maybe<uint64_t> tryGetLength() override {
   3372     return inner->tryGetLength();;
   3373   }
   3374   kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
   3375     return inner->pumpTo(output, amount);
   3376   }
   3377   kj::Promise<void> write(const void* buffer, size_t size) override {
   3378     return inner->write(buffer, size);
   3379   }
   3380   kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
   3381     return inner->write(pieces);
   3382   }
   3383   kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
   3384       kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
   3385     return inner->tryPumpFrom(input, amount);
   3386   }
   3387   Promise<void> whenWriteDisconnected() override {
   3388     return inner->whenWriteDisconnected();
   3389   }
   3390   void shutdownWrite() override {
   3391     return inner->shutdownWrite();
   3392   }
   3393   void abortRead() override {
   3394     return inner->abortRead();
   3395   }
   3396 
   3397 public:
   3398   kj::Own<AsyncIoStream> inner;
   3399   uint& count;
   3400 };
   3401 
   3402 class CountingNetworkAddress final: public kj::NetworkAddress {
   3403 public:
   3404   CountingNetworkAddress(kj::NetworkAddress& inner, uint& count, uint& cumulative)
   3405       : inner(inner), count(count), addrCount(ownAddrCount), cumulative(cumulative) {}
   3406   CountingNetworkAddress(kj::Own<kj::NetworkAddress> inner, uint& count, uint& addrCount)
   3407       : inner(*inner), ownInner(kj::mv(inner)), count(count), addrCount(addrCount),
   3408         cumulative(ownCumulative) {}
   3409   ~CountingNetworkAddress() noexcept(false) {
   3410     --addrCount;
   3411   }
   3412 
   3413   kj::Promise<kj::Own<kj::AsyncIoStream>> connect() override {
   3414     ++count;
   3415     ++cumulative;
   3416     return inner.connect()
   3417         .then([this](kj::Own<kj::AsyncIoStream> stream) -> kj::Own<kj::AsyncIoStream> {
   3418       return kj::heap<CountingIoStream>(kj::mv(stream), count);
   3419     });
   3420   }
   3421 
   3422   kj::Own<kj::ConnectionReceiver> listen() override { KJ_UNIMPLEMENTED("test"); }
   3423   kj::Own<kj::NetworkAddress> clone() override { KJ_UNIMPLEMENTED("test"); }
   3424   kj::String toString() override { KJ_UNIMPLEMENTED("test"); }
   3425 
   3426 private:
   3427   kj::NetworkAddress& inner;
   3428   kj::Own<kj::NetworkAddress> ownInner;
   3429   uint& count;
   3430   uint ownAddrCount = 1;
   3431   uint& addrCount;
   3432   uint ownCumulative = 0;
   3433   uint& cumulative;
   3434 };
   3435 
   3436 class ConnectionCountingNetwork final: public kj::Network {
   3437 public:
   3438   ConnectionCountingNetwork(kj::Network& inner, uint& count, uint& addrCount)
   3439       : inner(inner), count(count), addrCount(addrCount) {}
   3440 
   3441   Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override {
   3442     ++addrCount;
   3443     return inner.parseAddress(addr, portHint)
   3444         .then([this](Own<NetworkAddress>&& addr) -> Own<NetworkAddress> {
   3445       return kj::heap<CountingNetworkAddress>(kj::mv(addr), count, addrCount);
   3446     });
   3447   }
   3448   Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override {
   3449     KJ_UNIMPLEMENTED("test");
   3450   }
   3451   Own<Network> restrictPeers(
   3452       kj::ArrayPtr<const kj::StringPtr> allow,
   3453       kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override {
   3454     KJ_UNIMPLEMENTED("test");
   3455   }
   3456 
   3457 private:
   3458   kj::Network& inner;
   3459   uint& count;
   3460   uint& addrCount;
   3461 };
   3462 
   3463 class DummyService final: public HttpService {
   3464 public:
   3465   DummyService(HttpHeaderTable& headerTable): headerTable(headerTable) {}
   3466 
   3467   kj::Promise<void> request(
   3468       HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
   3469       kj::AsyncInputStream& requestBody, Response& response) override {
   3470     if (!headers.isWebSocket()) {
   3471       if (url == "/throw") {
   3472         return KJ_EXCEPTION(FAILED, "client requested failure");
   3473       }
   3474 
   3475       auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url);
   3476       auto stream = response.send(200, "OK", HttpHeaders(headerTable), body.size());
   3477       auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
   3478       promises.add(stream->write(body.begin(), body.size()));
   3479       promises.add(requestBody.readAllBytes().ignoreResult());
   3480       return kj::joinPromises(promises.finish()).attach(kj::mv(stream), kj::mv(body));
   3481     } else {
   3482       auto ws = response.acceptWebSocket(HttpHeaders(headerTable));
   3483       auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url);
   3484       auto sendPromise = ws->send(body);
   3485 
   3486       auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
   3487       promises.add(sendPromise.attach(kj::mv(body)));
   3488       promises.add(ws->receive().ignoreResult());
   3489       return kj::joinPromises(promises.finish()).attach(kj::mv(ws));
   3490     }
   3491   }
   3492 
   3493 private:
   3494   HttpHeaderTable& headerTable;
   3495 };
   3496 
   3497 KJ_TEST("HttpClient connection management") {
   3498   KJ_HTTP_TEST_SETUP_IO;
   3499   KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR;
   3500 
   3501   kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
   3502   kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
   3503   HttpHeaderTable headerTable;
   3504 
   3505   DummyService service(headerTable);
   3506   HttpServerSettings serverSettings;
   3507   HttpServer server(serverTimer, headerTable, service, serverSettings);
   3508   auto listenTask = server.listenHttp(*listener);
   3509 
   3510   uint count = 0;
   3511   uint cumulative = 0;
   3512   CountingNetworkAddress countingAddr(*addr, count, cumulative);
   3513 
   3514   FakeEntropySource entropySource;
   3515   HttpClientSettings clientSettings;
   3516   clientSettings.entropySource = entropySource;
   3517   auto client = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings);
   3518 
   3519   KJ_EXPECT(count == 0);
   3520   KJ_EXPECT(cumulative == 0);
   3521 
   3522   uint i = 0;
   3523   auto doRequest = [&]() {
   3524     uint n = i++;
   3525     return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response
   3526         .then([](HttpClient::Response&& response) {
   3527       auto promise = response.body->readAllText();
   3528       return promise.attach(kj::mv(response.body));
   3529     }).then([n](kj::String body) {
   3530       KJ_EXPECT(body == kj::str("null:/", n));
   3531     });
   3532   };
   3533 
   3534   // We can do several requests in a row and only have one connection.
   3535   doRequest().wait(waitScope);
   3536   doRequest().wait(waitScope);
   3537   doRequest().wait(waitScope);
   3538   KJ_EXPECT(count == 1);
   3539   KJ_EXPECT(cumulative == 1);
   3540 
   3541   // But if we do two in parallel, we'll end up with two connections.
   3542   auto req1 = doRequest();
   3543   auto req2 = doRequest();
   3544   req1.wait(waitScope);
   3545   req2.wait(waitScope);
   3546   KJ_EXPECT(count == 2);
   3547   KJ_EXPECT(cumulative == 2);
   3548 
   3549   // We can reuse after a POST, provided we write the whole POST body properly.
   3550   {
   3551     auto req = client->request(
   3552         HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6));
   3553     req.body->write("foobar", 6).wait(waitScope);
   3554     req.response.wait(waitScope).body->readAllBytes().wait(waitScope);
   3555   }
   3556   KJ_EXPECT(count == 2);
   3557   KJ_EXPECT(cumulative == 2);
   3558   doRequest().wait(waitScope);
   3559   KJ_EXPECT(count == 2);
   3560   KJ_EXPECT(cumulative == 2);
   3561 
   3562   // Advance time for half the timeout, then exercise one of the connections.
   3563   clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimeout / 2);
   3564   doRequest().wait(waitScope);
   3565   doRequest().wait(waitScope);
   3566   waitScope.poll();
   3567   KJ_EXPECT(count == 2);
   3568   KJ_EXPECT(cumulative == 2);
   3569 
   3570   // Advance time past when the other connection should time out. It should be dropped.
   3571   clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimeout * 3 / 4);
   3572   waitScope.poll();
   3573   KJ_EXPECT(count == 1);
   3574   KJ_EXPECT(cumulative == 2);
   3575 
   3576   // Wait for the other to drop.
   3577   clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimeout / 2);
   3578   waitScope.poll();
   3579   KJ_EXPECT(count == 0);
   3580   KJ_EXPECT(cumulative == 2);
   3581 
   3582   // New request creates a new connection again.
   3583   doRequest().wait(waitScope);
   3584   KJ_EXPECT(count == 1);
   3585   KJ_EXPECT(cumulative == 3);
   3586 
   3587   // WebSocket connections are not reused.
   3588   client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable))
   3589       .wait(waitScope);
   3590   KJ_EXPECT(count == 0);
   3591   KJ_EXPECT(cumulative == 3);
   3592 
   3593   // Errored connections are not reused.
   3594   doRequest().wait(waitScope);
   3595   KJ_EXPECT(count == 1);
   3596   KJ_EXPECT(cumulative == 4);
   3597   client->request(HttpMethod::GET, kj::str("/throw"), HttpHeaders(headerTable)).response
   3598       .wait(waitScope).body->readAllBytes().wait(waitScope);
   3599   KJ_EXPECT(count == 0);
   3600   KJ_EXPECT(cumulative == 4);
   3601 
   3602   // Connections where we failed to read the full response body are not reused.
   3603   doRequest().wait(waitScope);
   3604   KJ_EXPECT(count == 1);
   3605   KJ_EXPECT(cumulative == 5);
   3606   client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable)).response
   3607       .wait(waitScope);
   3608   KJ_EXPECT(count == 0);
   3609   KJ_EXPECT(cumulative == 5);
   3610 
   3611   // Connections where we didn't even wait for the response headers are not reused.
   3612   doRequest().wait(waitScope);
   3613   KJ_EXPECT(count == 1);
   3614   KJ_EXPECT(cumulative == 6);
   3615   client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable));
   3616   KJ_EXPECT(count == 0);
   3617   KJ_EXPECT(cumulative == 6);
   3618 
   3619   // Connections where we failed to write the full request body are not reused.
   3620   doRequest().wait(waitScope);
   3621   KJ_EXPECT(count == 1);
   3622   KJ_EXPECT(cumulative == 7);
   3623   client->request(HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6)).response
   3624       .wait(waitScope).body->readAllBytes().wait(waitScope);
   3625   KJ_EXPECT(count == 0);
   3626   KJ_EXPECT(cumulative == 7);
   3627 
   3628   // If the server times out the connection, we figure it out on the client.
   3629   doRequest().wait(waitScope);
   3630 
   3631   // TODO(someday): Figure out why the following poll is necessary for the test to pass on Windows
   3632   //   and Mac.  Without it, it seems that the request's connection never starts, so the
   3633   //   subsequent advanceTo() does not actually time out the connection.
   3634   waitScope.poll();
   3635 
   3636   KJ_EXPECT(count == 1);
   3637   KJ_EXPECT(cumulative == 8);
   3638   serverTimer.advanceTo(serverTimer.now() + serverSettings.pipelineTimeout * 2);
   3639   waitScope.poll();
   3640   KJ_EXPECT(count == 0);
   3641   KJ_EXPECT(cumulative == 8);
   3642 
   3643   // Can still make requests.
   3644   doRequest().wait(waitScope);
   3645   KJ_EXPECT(count == 1);
   3646   KJ_EXPECT(cumulative == 9);
   3647 }
   3648 
   3649 KJ_TEST("HttpClient disable connection reuse") {
   3650   KJ_HTTP_TEST_SETUP_IO;
   3651   KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR;
   3652 
   3653   kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
   3654   kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
   3655   HttpHeaderTable headerTable;
   3656 
   3657   DummyService service(headerTable);
   3658   HttpServerSettings serverSettings;
   3659   HttpServer server(serverTimer, headerTable, service, serverSettings);
   3660   auto listenTask = server.listenHttp(*listener);
   3661 
   3662   uint count = 0;
   3663   uint cumulative = 0;
   3664   CountingNetworkAddress countingAddr(*addr, count, cumulative);
   3665 
   3666   FakeEntropySource entropySource;
   3667   HttpClientSettings clientSettings;
   3668   clientSettings.entropySource = entropySource;
   3669   clientSettings.idleTimeout = 0 * kj::SECONDS;
   3670   auto client = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings);
   3671 
   3672   KJ_EXPECT(count == 0);
   3673   KJ_EXPECT(cumulative == 0);
   3674 
   3675   uint i = 0;
   3676   auto doRequest = [&]() {
   3677     uint n = i++;
   3678     return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response
   3679         .then([](HttpClient::Response&& response) {
   3680       auto promise = response.body->readAllText();
   3681       return promise.attach(kj::mv(response.body));
   3682     }).then([n](kj::String body) {
   3683       KJ_EXPECT(body == kj::str("null:/", n));
   3684     });
   3685   };
   3686 
   3687   // Each serial request gets its own connection.
   3688   doRequest().wait(waitScope);
   3689   doRequest().wait(waitScope);
   3690   doRequest().wait(waitScope);
   3691   KJ_EXPECT(count == 0);
   3692   KJ_EXPECT(cumulative == 3);
   3693 
   3694   // Each parallel request gets its own connection.
   3695   auto req1 = doRequest();
   3696   auto req2 = doRequest();
   3697   req1.wait(waitScope);
   3698   req2.wait(waitScope);
   3699   KJ_EXPECT(count == 0);
   3700   KJ_EXPECT(cumulative == 5);
   3701 }
   3702 
   3703 KJ_TEST("HttpClient concurrency limiting") {
   3704 #if KJ_HTTP_TEST_USE_OS_PIPE && !__linux__
   3705   // On Windows and Mac, OS event delivery is not always immediate, and that seems to make this
   3706   // test flakey. On Linux, events are always immediately delivered. For now, we compile the test
   3707   // but we don't run it outside of Linux. We do run the in-memory-pipes version on all OSs since
   3708   // that mode shouldn't depend on kernel behavior at all.
   3709   return;
   3710 #endif
   3711 
   3712   KJ_HTTP_TEST_SETUP_IO;
   3713   KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR;
   3714 
   3715   kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
   3716   kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
   3717   HttpHeaderTable headerTable;
   3718 
   3719   DummyService service(headerTable);
   3720   HttpServerSettings serverSettings;
   3721   HttpServer server(serverTimer, headerTable, service, serverSettings);
   3722   auto listenTask = server.listenHttp(*listener);
   3723 
   3724   uint count = 0;
   3725   uint cumulative = 0;
   3726   CountingNetworkAddress countingAddr(*addr, count, cumulative);
   3727 
   3728   FakeEntropySource entropySource;
   3729   HttpClientSettings clientSettings;
   3730   clientSettings.entropySource = entropySource;
   3731   clientSettings.idleTimeout = 0 * kj::SECONDS;
   3732   auto innerClient = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings);
   3733 
   3734   struct CallbackEvent {
   3735     uint runningCount;
   3736     uint pendingCount;
   3737 
   3738     bool operator==(const CallbackEvent& other) const {
   3739       return runningCount == other.runningCount && pendingCount == other.pendingCount;
   3740     }
   3741     bool operator!=(const CallbackEvent& other) const { return !(*this == other); }
   3742     // TODO(someday): Can use default spaceship operator in C++20:
   3743     //auto operator<=>(const CallbackEvent&) const = default;
   3744   };
   3745 
   3746   kj::Vector<CallbackEvent> callbackEvents;
   3747   auto callback = [&](uint runningCount, uint pendingCount) {
   3748     callbackEvents.add(CallbackEvent{runningCount, pendingCount});
   3749   };
   3750   auto client = newConcurrencyLimitingHttpClient(*innerClient, 1, kj::mv(callback));
   3751 
   3752   KJ_EXPECT(count == 0);
   3753   KJ_EXPECT(cumulative == 0);
   3754 
   3755   uint i = 0;
   3756   auto doRequest = [&]() {
   3757     uint n = i++;
   3758     return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response
   3759         .then([](HttpClient::Response&& response) {
   3760       auto promise = response.body->readAllText();
   3761       return promise.attach(kj::mv(response.body));
   3762     }).then([n](kj::String body) {
   3763       KJ_EXPECT(body == kj::str("null:/", n));
   3764     });
   3765   };
   3766 
   3767   // Second connection blocked by first.
   3768   auto req1 = doRequest();
   3769 
   3770   KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 0} }));
   3771   callbackEvents.clear();
   3772 
   3773   auto req2 = doRequest();
   3774 
   3775   // TODO(someday): Figure out why this poll() is necessary on Windows and macOS.
   3776   waitScope.poll();
   3777 
   3778   KJ_EXPECT(req1.poll(waitScope));
   3779   KJ_EXPECT(!req2.poll(waitScope));
   3780   KJ_EXPECT(count == 1);
   3781   KJ_EXPECT(cumulative == 1);
   3782   KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 1} }));
   3783   callbackEvents.clear();
   3784 
   3785   // Releasing first connection allows second to start.
   3786   req1.wait(waitScope);
   3787   KJ_EXPECT(req2.poll(waitScope));
   3788   KJ_EXPECT(count == 1);
   3789   KJ_EXPECT(cumulative == 2);
   3790   KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 0} }));
   3791   callbackEvents.clear();
   3792 
   3793   req2.wait(waitScope);
   3794   KJ_EXPECT(count == 0);
   3795   KJ_EXPECT(cumulative == 2);
   3796   KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {0, 0} }));
   3797   callbackEvents.clear();
   3798 
   3799   // Using body stream after releasing blocked response promise throws no exception
   3800   auto req3 = doRequest();
   3801   {
   3802     kj::Own<kj::AsyncOutputStream> req4Body;
   3803     {
   3804       auto req4 = client->request(HttpMethod::GET, kj::str("/", ++i), HttpHeaders(headerTable));
   3805       waitScope.poll();
   3806       req4Body = kj::mv(req4.body);
   3807     }
   3808     auto writePromise = req4Body->write("a", 1);
   3809     KJ_EXPECT(!writePromise.poll(waitScope));
   3810   }
   3811   req3.wait(waitScope);
   3812   KJ_EXPECT(count == 0);
   3813   KJ_EXPECT(cumulative == 3);
   3814 
   3815   // Similar connection limiting for web sockets
   3816   // TODO(someday): Figure out why the sequencing of websockets events does
   3817   // not work correctly on Windows (and maybe macOS?).  The solution is not as
   3818   // simple as inserting poll()s as above, since doing so puts the websocket in
   3819   // a state that trips a "previous HTTP message body incomplete" assertion,
   3820   // while trying to write 500 network response.
   3821   callbackEvents.clear();
   3822   auto ws1 = kj::heap(client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable)));
   3823   KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 0} }));
   3824   callbackEvents.clear();
   3825   auto ws2 = kj::heap(client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable)));
   3826   KJ_EXPECT(ws1->poll(waitScope));
   3827   KJ_EXPECT(!ws2->poll(waitScope));
   3828   KJ_EXPECT(count == 1);
   3829   KJ_EXPECT(cumulative == 4);
   3830   KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 1} }));
   3831   callbackEvents.clear();
   3832 
   3833   {
   3834     auto response1 = ws1->wait(waitScope);
   3835     KJ_EXPECT(!ws2->poll(waitScope));
   3836     KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({}));
   3837   }
   3838   KJ_EXPECT(ws2->poll(waitScope));
   3839   KJ_EXPECT(count == 1);
   3840   KJ_EXPECT(cumulative == 5);
   3841   KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 0} }));
   3842   callbackEvents.clear();
   3843   {
   3844     auto response2 = ws2->wait(waitScope);
   3845     KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({}));
   3846   }
   3847   KJ_EXPECT(count == 0);
   3848   KJ_EXPECT(cumulative == 5);
   3849   KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {0, 0} }));
   3850 }
   3851 
   3852 #if KJ_HTTP_TEST_USE_OS_PIPE
   3853 // TODO(someday): Implement mock kj::Network for userspace version of this test?
   3854 KJ_TEST("HttpClient multi host") {
   3855   auto io = kj::setupAsyncIo();
   3856 
   3857   kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
   3858   kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
   3859   HttpHeaderTable headerTable;
   3860 
   3861   auto listener1 = io.provider->getNetwork().parseAddress("localhost", 0)
   3862       .wait(io.waitScope)->listen();
   3863   auto listener2 = io.provider->getNetwork().parseAddress("localhost", 0)
   3864       .wait(io.waitScope)->listen();
   3865   DummyService service(headerTable);
   3866   HttpServer server(serverTimer, headerTable, service);
   3867   auto listenTask1 = server.listenHttp(*listener1);
   3868   auto listenTask2 = server.listenHttp(*listener2);
   3869 
   3870   uint count = 0, addrCount = 0;
   3871   uint tlsCount = 0, tlsAddrCount = 0;
   3872   ConnectionCountingNetwork countingNetwork(io.provider->getNetwork(), count, addrCount);
   3873   ConnectionCountingNetwork countingTlsNetwork(io.provider->getNetwork(), tlsCount, tlsAddrCount);
   3874 
   3875   HttpClientSettings clientSettings;
   3876   auto client = newHttpClient(clientTimer, headerTable,
   3877       countingNetwork, countingTlsNetwork, clientSettings);
   3878 
   3879   KJ_EXPECT(count == 0);
   3880 
   3881   uint i = 0;
   3882   auto doRequest = [&](bool tls, uint port) {
   3883     uint n = i++;
   3884     // We stick a double-slash in the URL to test that it doesn't get coalesced into one slash,
   3885     // which was a bug in the past.
   3886     return client->request(HttpMethod::GET,
   3887         kj::str((tls ? "https://localhost:" : "http://localhost:"), port, "//", n),
   3888                 HttpHeaders(headerTable)).response
   3889         .then([](HttpClient::Response&& response) {
   3890       auto promise = response.body->readAllText();
   3891       return promise.attach(kj::mv(response.body));
   3892     }).then([n, port](kj::String body) {
   3893       KJ_EXPECT(body == kj::str("localhost:", port, "://", n), body, port, n);
   3894     });
   3895   };
   3896 
   3897   uint port1 = listener1->getPort();
   3898   uint port2 = listener2->getPort();
   3899 
   3900   // We can do several requests in a row to the same host and only have one connection.
   3901   doRequest(false, port1).wait(io.waitScope);
   3902   doRequest(false, port1).wait(io.waitScope);
   3903   doRequest(false, port1).wait(io.waitScope);
   3904   KJ_EXPECT(count == 1);
   3905   KJ_EXPECT(tlsCount == 0);
   3906   KJ_EXPECT(addrCount == 1);
   3907   KJ_EXPECT(tlsAddrCount == 0);
   3908 
   3909   // Request a different host, and now we have two connections.
   3910   doRequest(false, port2).wait(io.waitScope);
   3911   KJ_EXPECT(count == 2);
   3912   KJ_EXPECT(tlsCount == 0);
   3913   KJ_EXPECT(addrCount == 2);
   3914   KJ_EXPECT(tlsAddrCount == 0);
   3915 
   3916   // Try TLS.
   3917   doRequest(true, port1).wait(io.waitScope);
   3918   KJ_EXPECT(count == 2);
   3919   KJ_EXPECT(tlsCount == 1);
   3920   KJ_EXPECT(addrCount == 2);
   3921   KJ_EXPECT(tlsAddrCount == 1);
   3922 
   3923   // Try first host again, no change in connection count.
   3924   doRequest(false, port1).wait(io.waitScope);
   3925   KJ_EXPECT(count == 2);
   3926   KJ_EXPECT(tlsCount == 1);
   3927   KJ_EXPECT(addrCount == 2);
   3928   KJ_EXPECT(tlsAddrCount == 1);
   3929 
   3930   // Multiple requests in parallel forces more connections to that host.
   3931   auto promise1 = doRequest(false, port1);
   3932   auto promise2 = doRequest(false, port1);
   3933   promise1.wait(io.waitScope);
   3934   promise2.wait(io.waitScope);
   3935   KJ_EXPECT(count == 3);
   3936   KJ_EXPECT(tlsCount == 1);
   3937   KJ_EXPECT(addrCount == 2);
   3938   KJ_EXPECT(tlsAddrCount == 1);
   3939 
   3940   // Let everything expire.
   3941   clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimeout * 2);
   3942   io.waitScope.poll();
   3943   KJ_EXPECT(count == 0);
   3944   KJ_EXPECT(tlsCount == 0);
   3945   KJ_EXPECT(addrCount == 0);
   3946   KJ_EXPECT(tlsAddrCount == 0);
   3947 
   3948   // We can still request those hosts again.
   3949   doRequest(false, port1).wait(io.waitScope);
   3950   KJ_EXPECT(count == 1);
   3951   KJ_EXPECT(tlsCount == 0);
   3952   KJ_EXPECT(addrCount == 1);
   3953   KJ_EXPECT(tlsAddrCount == 0);
   3954 }
   3955 #endif
   3956 
   3957 // -----------------------------------------------------------------------------
   3958 
   3959 #if KJ_HTTP_TEST_USE_OS_PIPE
   3960 // This test only makes sense using the real network.
   3961 KJ_TEST("HttpClient to capnproto.org") {
   3962   auto io = kj::setupAsyncIo();
   3963 
   3964   auto maybeConn = io.provider->getNetwork().parseAddress("capnproto.org", 80)
   3965       .then([](kj::Own<kj::NetworkAddress> addr) {
   3966     auto promise = addr->connect();
   3967     return promise.attach(kj::mv(addr));
   3968   }).then([](kj::Own<kj::AsyncIoStream>&& connection) -> kj::Maybe<kj::Own<kj::AsyncIoStream>> {
   3969     return kj::mv(connection);
   3970   }, [](kj::Exception&& e) -> kj::Maybe<kj::Own<kj::AsyncIoStream>> {
   3971     KJ_LOG(WARNING, "skipping test because couldn't connect to capnproto.org");
   3972     return nullptr;
   3973   }).wait(io.waitScope);
   3974 
   3975   KJ_IF_MAYBE(conn, maybeConn) {
   3976     // Successfully connected to capnproto.org. Try doing GET /. We expect to get a redirect to
   3977     // HTTPS, because what kind of horrible web site would serve in plaintext, really?
   3978 
   3979     HttpHeaderTable table;
   3980     auto client = newHttpClient(table, **conn);
   3981 
   3982     HttpHeaders headers(table);
   3983     headers.set(HttpHeaderId::HOST, "capnproto.org");
   3984 
   3985     auto response = client->request(HttpMethod::GET, "/", headers).response.wait(io.waitScope);
   3986     KJ_EXPECT(response.statusCode / 100 == 3);
   3987     auto location = KJ_ASSERT_NONNULL(response.headers->get(HttpHeaderId::LOCATION));
   3988     KJ_EXPECT(location == "https://capnproto.org/");
   3989 
   3990     auto body = response.body->readAllText().wait(io.waitScope);
   3991   }
   3992 }
   3993 #endif
   3994 
   3995 // =======================================================================================
   3996 // Misc bugfix tests
   3997 
   3998 class ReadCancelHttpService final: public HttpService {
   3999   // HttpService that tries to read all request data but cancels after 1ms and sends a response.
   4000 public:
   4001   ReadCancelHttpService(kj::Timer& timer, HttpHeaderTable& headerTable)
   4002       : timer(timer), headerTable(headerTable) {}
   4003 
   4004   kj::Promise<void> request(
   4005       HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
   4006       kj::AsyncInputStream& requestBody, Response& responseSender) override {
   4007     if (method == HttpMethod::POST) {
   4008       // Try to read all content, but cancel after 1ms.
   4009       return requestBody.readAllBytes().ignoreResult()
   4010           .exclusiveJoin(timer.afterDelay(1 * kj::MILLISECONDS))
   4011           .then([this, &responseSender]() {
   4012         responseSender.send(408, "Request Timeout", kj::HttpHeaders(headerTable), uint64_t(0));
   4013       });
   4014     } else {
   4015       responseSender.send(200, "OK", kj::HttpHeaders(headerTable), uint64_t(0));
   4016       return kj::READY_NOW;
   4017     }
   4018   }
   4019 
   4020 private:
   4021   kj::Timer& timer;
   4022   HttpHeaderTable& headerTable;
   4023 };
   4024 
   4025 KJ_TEST("canceling a length stream mid-read correctly discards rest of request") {
   4026   KJ_HTTP_TEST_SETUP_IO;
   4027   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   4028   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   4029 
   4030   HttpHeaderTable table;
   4031   ReadCancelHttpService service(timer, table);
   4032   HttpServer server(timer, table, service);
   4033 
   4034   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   4035 
   4036   {
   4037     static constexpr kj::StringPtr REQUEST =
   4038         "POST / HTTP/1.1\r\n"
   4039         "Content-Length: 6\r\n"
   4040         "\r\n"
   4041         "fooba"_kj;  // incomplete
   4042     pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope);
   4043 
   4044     auto promise = expectRead(*pipe.ends[1],
   4045         "HTTP/1.1 408 Request Timeout\r\n"
   4046         "Content-Length: 0\r\n"
   4047         "\r\n"_kj);
   4048 
   4049     KJ_EXPECT(!promise.poll(waitScope));
   4050 
   4051     // Trigger timout, then response should be sent.
   4052     timer.advanceTo(timer.now() + 1 * kj::MILLISECONDS);
   4053     KJ_ASSERT(promise.poll(waitScope));
   4054     promise.wait(waitScope);
   4055   }
   4056 
   4057   // We left our request stream hanging. The server will try to read and discard the request body.
   4058   // Let's give it the rest of the data, followed by a second request.
   4059   {
   4060     static constexpr kj::StringPtr REQUEST =
   4061         "r"
   4062         "GET / HTTP/1.1\r\n"
   4063         "\r\n"_kj;
   4064     pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope);
   4065 
   4066     auto promise = expectRead(*pipe.ends[1],
   4067         "HTTP/1.1 200 OK\r\n"
   4068         "Content-Length: 0\r\n"
   4069         "\r\n"_kj);
   4070     KJ_ASSERT(promise.poll(waitScope));
   4071     promise.wait(waitScope);
   4072   }
   4073 }
   4074 
   4075 KJ_TEST("canceling a chunked stream mid-read correctly discards rest of request") {
   4076   KJ_HTTP_TEST_SETUP_IO;
   4077   kj::TimerImpl timer(kj::origin<kj::TimePoint>());
   4078   auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
   4079 
   4080   HttpHeaderTable table;
   4081   ReadCancelHttpService service(timer, table);
   4082   HttpServer server(timer, table, service);
   4083 
   4084   auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
   4085 
   4086   {
   4087     static constexpr kj::StringPtr REQUEST =
   4088         "POST / HTTP/1.1\r\n"
   4089         "Transfer-Encoding: chunked\r\n"
   4090         "\r\n"
   4091         "6\r\n"
   4092         "fooba"_kj;  // incomplete chunk
   4093     pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope);
   4094 
   4095     auto promise = expectRead(*pipe.ends[1],
   4096         "HTTP/1.1 408 Request Timeout\r\n"
   4097         "Content-Length: 0\r\n"
   4098         "\r\n"_kj);
   4099 
   4100     KJ_EXPECT(!promise.poll(waitScope));
   4101 
   4102     // Trigger timout, then response should be sent.
   4103     timer.advanceTo(timer.now() + 1 * kj::MILLISECONDS);
   4104     KJ_ASSERT(promise.poll(waitScope));
   4105     promise.wait(waitScope);
   4106   }
   4107 
   4108   // We left our request stream hanging. The server will try to read and discard the request body.
   4109   // Let's give it the rest of the data, followed by a second request.
   4110   {
   4111     static constexpr kj::StringPtr REQUEST =
   4112         "r\r\n"
   4113         "4a\r\n"
   4114         "this is some text that is the body of a chunk and not a valid chunk header\r\n"
   4115         "0\r\n"
   4116         "\r\n"
   4117         "GET / HTTP/1.1\r\n"
   4118         "\r\n"_kj;
   4119     pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope);
   4120 
   4121     auto promise = expectRead(*pipe.ends[1],
   4122         "HTTP/1.1 200 OK\r\n"
   4123         "Content-Length: 0\r\n"
   4124         "\r\n"_kj);
   4125     KJ_ASSERT(promise.poll(waitScope));
   4126     promise.wait(waitScope);
   4127   }
   4128 }
   4129 
   4130 }  // namespace
   4131 }  // namespace kj