http-test.c++ (134395B)
1 // Copyright (c) 2017 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #define KJ_TESTING_KJ 1 23 24 #include "http.h" 25 #include <kj/debug.h> 26 #include <kj/test.h> 27 #include <kj/encoding.h> 28 #include <map> 29 30 #if KJ_HTTP_TEST_USE_OS_PIPE 31 // Run the test using OS-level socketpairs. (See http-socketpair-test.c++.) 32 #define KJ_HTTP_TEST_SETUP_IO \ 33 auto io = kj::setupAsyncIo(); \ 34 auto& waitScope KJ_UNUSED = io.waitScope 35 #define KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR \ 36 auto listener = io.provider->getNetwork().parseAddress("localhost", 0) \ 37 .wait(waitScope)->listen(); \ 38 auto addr = io.provider->getNetwork().parseAddress("localhost", listener->getPort()) \ 39 .wait(waitScope) 40 #define KJ_HTTP_TEST_CREATE_2PIPE \ 41 io.provider->newTwoWayPipe() 42 #else 43 // Run the test using in-process two-way pipes. 44 #define KJ_HTTP_TEST_SETUP_IO \ 45 kj::EventLoop eventLoop; \ 46 kj::WaitScope waitScope(eventLoop) 47 #define KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR \ 48 auto capPipe = newCapabilityPipe(); \ 49 auto listener = kj::heap<kj::CapabilityStreamConnectionReceiver>(*capPipe.ends[0]); \ 50 auto addr = kj::heap<kj::CapabilityStreamNetworkAddress>(nullptr, *capPipe.ends[1]) 51 #define KJ_HTTP_TEST_CREATE_2PIPE \ 52 kj::newTwoWayPipe() 53 #endif 54 55 namespace kj { 56 namespace { 57 58 KJ_TEST("HttpMethod parse / stringify") { 59 #define TRY(name) \ 60 KJ_EXPECT(kj::str(HttpMethod::name) == #name); \ 61 KJ_IF_MAYBE(parsed, tryParseHttpMethod(#name)) { \ 62 KJ_EXPECT(*parsed == HttpMethod::name); \ 63 } else { \ 64 KJ_FAIL_EXPECT("couldn't parse \"" #name "\" as HttpMethod"); \ 65 } 66 67 KJ_HTTP_FOR_EACH_METHOD(TRY) 68 #undef TRY 69 70 KJ_EXPECT(tryParseHttpMethod("FOO") == nullptr); 71 KJ_EXPECT(tryParseHttpMethod("") == nullptr); 72 KJ_EXPECT(tryParseHttpMethod("G") == nullptr); 73 KJ_EXPECT(tryParseHttpMethod("GE") == nullptr); 74 KJ_EXPECT(tryParseHttpMethod("GET ") == nullptr); 75 KJ_EXPECT(tryParseHttpMethod("get") == nullptr); 76 } 77 78 KJ_TEST("HttpHeaderTable") { 79 HttpHeaderTable::Builder builder; 80 81 auto host = builder.add("Host"); 82 auto host2 = builder.add("hOsT"); 83 auto fooBar = builder.add("Foo-Bar"); 84 auto bazQux = builder.add("baz-qux"); 85 auto bazQux2 = builder.add("Baz-Qux"); 86 87 auto table = builder.build(); 88 89 uint builtinHeaderCount = 0; 90 #define INCREMENT(id, name) ++builtinHeaderCount; 91 KJ_HTTP_FOR_EACH_BUILTIN_HEADER(INCREMENT) 92 #undef INCREMENT 93 94 KJ_EXPECT(table->idCount() == builtinHeaderCount + 2); 95 96 KJ_EXPECT(host == HttpHeaderId::HOST); 97 KJ_EXPECT(host != HttpHeaderId::DATE); 98 KJ_EXPECT(host2 == host); 99 100 KJ_EXPECT(host != fooBar); 101 KJ_EXPECT(host != bazQux); 102 KJ_EXPECT(fooBar != bazQux); 103 KJ_EXPECT(bazQux == bazQux2); 104 105 KJ_EXPECT(kj::str(host) == "Host"); 106 KJ_EXPECT(kj::str(host2) == "Host"); 107 KJ_EXPECT(kj::str(fooBar) == "Foo-Bar"); 108 KJ_EXPECT(kj::str(bazQux) == "baz-qux"); 109 KJ_EXPECT(kj::str(HttpHeaderId::HOST) == "Host"); 110 111 KJ_EXPECT(table->idToString(HttpHeaderId::DATE) == "Date"); 112 KJ_EXPECT(table->idToString(fooBar) == "Foo-Bar"); 113 114 KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("Date")) == HttpHeaderId::DATE); 115 KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("dATE")) == HttpHeaderId::DATE); 116 KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("Foo-Bar")) == fooBar); 117 KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("foo-BAR")) == fooBar); 118 KJ_EXPECT(table->stringToId("foobar") == nullptr); 119 KJ_EXPECT(table->stringToId("barfoo") == nullptr); 120 } 121 122 KJ_TEST("HttpHeaders::parseRequest") { 123 HttpHeaderTable::Builder builder; 124 125 auto fooBar = builder.add("Foo-Bar"); 126 auto bazQux = builder.add("baz-qux"); 127 128 auto table = builder.build(); 129 130 HttpHeaders headers(*table); 131 auto text = kj::heapString( 132 "POST /some/path \t HTTP/1.1\r\n" 133 "Foo-BaR: Baz\r\n" 134 "Host: example.com\r\n" 135 "Content-Length: 123\r\n" 136 "DATE: early\r\n" 137 "other-Header: yep\r\n" 138 "with.dots: sure\r\n" 139 "\r\n"); 140 auto result = headers.tryParseRequest(text.asArray()).get<HttpHeaders::Request>(); 141 142 KJ_EXPECT(result.method == HttpMethod::POST); 143 KJ_EXPECT(result.url == "/some/path"); 144 KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::HOST)) == "example.com"); 145 KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::DATE)) == "early"); 146 KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(fooBar)) == "Baz"); 147 KJ_EXPECT(headers.get(bazQux) == nullptr); 148 KJ_EXPECT(headers.get(HttpHeaderId::CONTENT_TYPE) == nullptr); 149 KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::CONTENT_LENGTH)) == "123"); 150 KJ_EXPECT(headers.get(HttpHeaderId::TRANSFER_ENCODING) == nullptr); 151 152 std::map<kj::StringPtr, kj::StringPtr> unpackedHeaders; 153 headers.forEach([&](kj::StringPtr name, kj::StringPtr value) { 154 KJ_EXPECT(unpackedHeaders.insert(std::make_pair(name, value)).second); 155 }); 156 KJ_EXPECT(unpackedHeaders.size() == 6); 157 KJ_EXPECT(unpackedHeaders["Content-Length"] == "123"); 158 KJ_EXPECT(unpackedHeaders["Host"] == "example.com"); 159 KJ_EXPECT(unpackedHeaders["Date"] == "early"); 160 KJ_EXPECT(unpackedHeaders["Foo-Bar"] == "Baz"); 161 KJ_EXPECT(unpackedHeaders["other-Header"] == "yep"); 162 KJ_EXPECT(unpackedHeaders["with.dots"] == "sure"); 163 164 KJ_EXPECT(headers.serializeRequest(result.method, result.url) == 165 "POST /some/path HTTP/1.1\r\n" 166 "Content-Length: 123\r\n" 167 "Host: example.com\r\n" 168 "Date: early\r\n" 169 "Foo-Bar: Baz\r\n" 170 "other-Header: yep\r\n" 171 "with.dots: sure\r\n" 172 "\r\n"); 173 } 174 175 KJ_TEST("HttpHeaders::parseResponse") { 176 HttpHeaderTable::Builder builder; 177 178 auto fooBar = builder.add("Foo-Bar"); 179 auto bazQux = builder.add("baz-qux"); 180 181 auto table = builder.build(); 182 183 HttpHeaders headers(*table); 184 auto text = kj::heapString( 185 "HTTP/1.1\t\t 418\t I'm a teapot\r\n" 186 "Foo-BaR: Baz\r\n" 187 "Host: example.com\r\n" 188 "Content-Length: 123\r\n" 189 "DATE: early\r\n" 190 "other-Header: yep\r\n" 191 "\r\n"); 192 auto result = headers.tryParseResponse(text.asArray()).get<HttpHeaders::Response>(); 193 194 KJ_EXPECT(result.statusCode == 418); 195 KJ_EXPECT(result.statusText == "I'm a teapot"); 196 KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::HOST)) == "example.com"); 197 KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::DATE)) == "early"); 198 KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(fooBar)) == "Baz"); 199 KJ_EXPECT(headers.get(bazQux) == nullptr); 200 KJ_EXPECT(headers.get(HttpHeaderId::CONTENT_TYPE) == nullptr); 201 KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::CONTENT_LENGTH)) == "123"); 202 KJ_EXPECT(headers.get(HttpHeaderId::TRANSFER_ENCODING) == nullptr); 203 204 std::map<kj::StringPtr, kj::StringPtr> unpackedHeaders; 205 headers.forEach([&](kj::StringPtr name, kj::StringPtr value) { 206 KJ_EXPECT(unpackedHeaders.insert(std::make_pair(name, value)).second); 207 }); 208 KJ_EXPECT(unpackedHeaders.size() == 5); 209 KJ_EXPECT(unpackedHeaders["Content-Length"] == "123"); 210 KJ_EXPECT(unpackedHeaders["Host"] == "example.com"); 211 KJ_EXPECT(unpackedHeaders["Date"] == "early"); 212 KJ_EXPECT(unpackedHeaders["Foo-Bar"] == "Baz"); 213 KJ_EXPECT(unpackedHeaders["other-Header"] == "yep"); 214 215 KJ_EXPECT(headers.serializeResponse( 216 result.statusCode, result.statusText) == 217 "HTTP/1.1 418 I'm a teapot\r\n" 218 "Content-Length: 123\r\n" 219 "Host: example.com\r\n" 220 "Date: early\r\n" 221 "Foo-Bar: Baz\r\n" 222 "other-Header: yep\r\n" 223 "\r\n"); 224 } 225 226 KJ_TEST("HttpHeaders parse invalid") { 227 auto table = HttpHeaderTable::Builder().build(); 228 HttpHeaders headers(*table); 229 230 // NUL byte in request. 231 { 232 auto input = kj::heapString( 233 "POST \0 /some/path \t HTTP/1.1\r\n" 234 "Foo-BaR: Baz\r\n" 235 "Host: example.com\r\n" 236 "DATE: early\r\n" 237 "other-Header: yep\r\n" 238 "\r\n"); 239 240 auto protocolError = headers.tryParseRequest(input).get<HttpHeaders::ProtocolError>(); 241 242 KJ_EXPECT(protocolError.description == "Request headers have no terminal newline.", 243 protocolError.description); 244 KJ_EXPECT(protocolError.rawContent.asChars() == input); 245 } 246 247 // Control character in header name. 248 { 249 auto input = kj::heapString( 250 "POST /some/path \t HTTP/1.1\r\n" 251 "Foo-BaR: Baz\r\n" 252 "Cont\001ent-Length: 123\r\n" 253 "DATE: early\r\n" 254 "other-Header: yep\r\n" 255 "\r\n"); 256 257 auto protocolError = headers.tryParseRequest(input).get<HttpHeaders::ProtocolError>(); 258 259 KJ_EXPECT(protocolError.description == "The headers sent by your client are not valid.", 260 protocolError.description); 261 KJ_EXPECT(protocolError.rawContent.asChars() == input); 262 } 263 264 // Separator character in header name. 265 { 266 auto input = kj::heapString( 267 "POST /some/path \t HTTP/1.1\r\n" 268 "Foo-BaR: Baz\r\n" 269 "Host: example.com\r\n" 270 "DATE/: early\r\n" 271 "other-Header: yep\r\n" 272 "\r\n"); 273 274 auto protocolError = headers.tryParseRequest(input).get<HttpHeaders::ProtocolError>(); 275 276 KJ_EXPECT(protocolError.description == "The headers sent by your client are not valid.", 277 protocolError.description); 278 KJ_EXPECT(protocolError.rawContent.asChars() == input); 279 } 280 281 // Response status code not numeric. 282 { 283 auto input = kj::heapString( 284 "HTTP/1.1\t\t abc\t I'm a teapot\r\n" 285 "Foo-BaR: Baz\r\n" 286 "Host: example.com\r\n" 287 "DATE: early\r\n" 288 "other-Header: yep\r\n" 289 "\r\n"); 290 291 auto protocolError = headers.tryParseRequest(input).get<HttpHeaders::ProtocolError>(); 292 293 KJ_EXPECT(protocolError.description == "Unrecognized request method.", 294 protocolError.description); 295 KJ_EXPECT(protocolError.rawContent.asChars() == input); 296 } 297 } 298 299 KJ_TEST("HttpHeaders validation") { 300 auto table = HttpHeaderTable::Builder().build(); 301 HttpHeaders headers(*table); 302 303 headers.add("Valid-Name", "valid value"); 304 305 // The HTTP RFC prohibits control characters, but browsers only prohibit \0, \r, and \n. KJ goes 306 // with the browsers for compatibility. 307 headers.add("Valid-Name", "valid\x01value"); 308 309 // The HTTP RFC does not permit non-ASCII values. 310 // KJ chooses to interpret them as UTF-8, to avoid the need for any expensive conversion. 311 // Browsers apparently interpret them as LATIN-1. Applications can reinterpet these strings as 312 // LATIN-1 easily enough if they really need to. 313 headers.add("Valid-Name", u8"valid€value"); 314 315 KJ_EXPECT_THROW_MESSAGE("invalid header name", headers.add("Invalid Name", "value")); 316 KJ_EXPECT_THROW_MESSAGE("invalid header name", headers.add("Invalid@Name", "value")); 317 318 KJ_EXPECT_THROW_MESSAGE("invalid header value", headers.set(HttpHeaderId::HOST, "in\nvalid")); 319 KJ_EXPECT_THROW_MESSAGE("invalid header value", headers.add("Valid-Name", "in\nvalid")); 320 } 321 322 KJ_TEST("HttpHeaders Set-Cookie handling") { 323 HttpHeaderTable::Builder builder; 324 auto hCookie = builder.add("Cookie"); 325 auto hSetCookie = builder.add("Set-Cookie"); 326 auto table = builder.build(); 327 328 HttpHeaders headers(*table); 329 headers.set(hCookie, "Foo"); 330 headers.add("Cookie", "Bar"); 331 headers.add("Cookie", "Baz"); 332 headers.set(hSetCookie, "Foo"); 333 headers.add("Set-Cookie", "Bar"); 334 headers.add("Set-Cookie", "Baz"); 335 336 auto text = headers.toString(); 337 KJ_EXPECT(text == 338 "Cookie: Foo, Bar, Baz\r\n" 339 "Set-Cookie: Foo\r\n" 340 "Set-Cookie: Bar\r\n" 341 "Set-Cookie: Baz\r\n" 342 "\r\n", text); 343 } 344 345 // ======================================================================================= 346 347 class ReadFragmenter final: public kj::AsyncIoStream { 348 public: 349 ReadFragmenter(AsyncIoStream& inner, size_t limit): inner(inner), limit(limit) {} 350 351 Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override { 352 return inner.read(buffer, minBytes, kj::max(minBytes, kj::min(limit, maxBytes))); 353 } 354 Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { 355 return inner.tryRead(buffer, minBytes, kj::max(minBytes, kj::min(limit, maxBytes))); 356 } 357 358 Maybe<uint64_t> tryGetLength() override { return inner.tryGetLength(); } 359 360 Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override { 361 return inner.pumpTo(output, amount); 362 } 363 364 Promise<void> write(const void* buffer, size_t size) override { 365 return inner.write(buffer, size); 366 } 367 Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override { 368 return inner.write(pieces); 369 } 370 371 Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override { 372 return inner.tryPumpFrom(input, amount); 373 } 374 375 Promise<void> whenWriteDisconnected() override { 376 return inner.whenWriteDisconnected(); 377 } 378 379 void shutdownWrite() override { 380 return inner.shutdownWrite(); 381 } 382 383 void abortRead() override { return inner.abortRead(); } 384 385 void getsockopt(int level, int option, void* value, uint* length) override { 386 return inner.getsockopt(level, option, value, length); 387 } 388 void setsockopt(int level, int option, const void* value, uint length) override { 389 return inner.setsockopt(level, option, value, length); 390 } 391 392 void getsockname(struct sockaddr* addr, uint* length) override { 393 return inner.getsockname(addr, length); 394 } 395 void getpeername(struct sockaddr* addr, uint* length) override { 396 return inner.getsockname(addr, length); 397 } 398 399 private: 400 kj::AsyncIoStream& inner; 401 size_t limit; 402 }; 403 404 template <typename T> 405 class InitializeableArray: public Array<T> { 406 public: 407 InitializeableArray(std::initializer_list<T> init) 408 : Array<T>(kj::heapArray(init)) {} 409 }; 410 411 enum Side { BOTH, CLIENT_ONLY, SERVER_ONLY }; 412 413 struct HeaderTestCase { 414 HttpHeaderId id; 415 kj::StringPtr value; 416 }; 417 418 struct HttpRequestTestCase { 419 kj::StringPtr raw; 420 421 HttpMethod method; 422 kj::StringPtr path; 423 InitializeableArray<HeaderTestCase> requestHeaders; 424 kj::Maybe<uint64_t> requestBodySize; 425 InitializeableArray<kj::StringPtr> requestBodyParts; 426 427 Side side = BOTH; 428 429 // TODO(cleanup): Delete this constructor if/when we move to C++14. 430 HttpRequestTestCase(kj::StringPtr raw, HttpMethod method, kj::StringPtr path, 431 InitializeableArray<HeaderTestCase> requestHeaders, 432 kj::Maybe<uint64_t> requestBodySize, 433 InitializeableArray<kj::StringPtr> requestBodyParts, 434 Side side = BOTH) 435 : raw(raw), method(method), path(path), requestHeaders(kj::mv(requestHeaders)), 436 requestBodySize(requestBodySize), requestBodyParts(kj::mv(requestBodyParts)), 437 side(side) {} 438 }; 439 440 struct HttpResponseTestCase { 441 kj::StringPtr raw; 442 443 uint64_t statusCode; 444 kj::StringPtr statusText; 445 InitializeableArray<HeaderTestCase> responseHeaders; 446 kj::Maybe<uint64_t> responseBodySize; 447 InitializeableArray<kj::StringPtr> responseBodyParts; 448 449 HttpMethod method = HttpMethod::GET; 450 451 Side side = BOTH; 452 453 // TODO(cleanup): Delete this constructor if/when we move to C++14. 454 HttpResponseTestCase(kj::StringPtr raw, uint64_t statusCode, kj::StringPtr statusText, 455 InitializeableArray<HeaderTestCase> responseHeaders, 456 kj::Maybe<uint64_t> responseBodySize, 457 InitializeableArray<kj::StringPtr> responseBodyParts, 458 HttpMethod method = HttpMethod::GET, 459 Side side = BOTH) 460 : raw(raw), statusCode(statusCode), statusText(statusText), 461 responseHeaders(kj::mv(responseHeaders)), responseBodySize(responseBodySize), 462 responseBodyParts(kj::mv(responseBodyParts)), method(method), side(side) {} 463 }; 464 465 struct HttpTestCase { 466 HttpRequestTestCase request; 467 HttpResponseTestCase response; 468 }; 469 470 kj::Promise<void> writeEach(kj::AsyncOutputStream& out, kj::ArrayPtr<const kj::StringPtr> parts) { 471 if (parts.size() == 0) return kj::READY_NOW; 472 473 return out.write(parts[0].begin(), parts[0].size()) 474 .then([&out,parts]() { 475 return writeEach(out, parts.slice(1, parts.size())); 476 }); 477 } 478 479 kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) { 480 if (expected.size() == 0) return kj::READY_NOW; 481 482 auto buffer = kj::heapArray<char>(expected.size()); 483 484 auto promise = in.tryRead(buffer.begin(), 1, buffer.size()); 485 return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) { 486 if (amount == 0) { 487 KJ_FAIL_ASSERT("expected data never sent", expected); 488 } 489 490 auto actual = buffer.slice(0, amount); 491 if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) { 492 KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual); 493 } 494 495 return expectRead(in, expected.slice(amount)); 496 })); 497 } 498 499 kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::ArrayPtr<const byte> expected) { 500 if (expected.size() == 0) return kj::READY_NOW; 501 502 auto buffer = kj::heapArray<byte>(expected.size()); 503 504 auto promise = in.tryRead(buffer.begin(), 1, buffer.size()); 505 return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<byte> buffer, size_t amount) { 506 if (amount == 0) { 507 KJ_FAIL_ASSERT("expected data never sent", expected); 508 } 509 510 auto actual = buffer.slice(0, amount); 511 if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) { 512 KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual); 513 } 514 515 return expectRead(in, expected.slice(amount, expected.size())); 516 })); 517 } 518 519 kj::Promise<void> expectEnd(kj::AsyncInputStream& in) { 520 static char buffer; 521 522 auto promise = in.tryRead(&buffer, 1, 1); 523 return promise.then([](size_t amount) { 524 KJ_ASSERT(amount == 0, "expected EOF"); 525 }); 526 } 527 528 void testHttpClientRequest(kj::WaitScope& waitScope, const HttpRequestTestCase& testCase, 529 kj::TwoWayPipe pipe) { 530 531 auto serverTask = expectRead(*pipe.ends[1], testCase.raw).then([&]() { 532 static const char SIMPLE_RESPONSE[] = 533 "HTTP/1.1 200 OK\r\n" 534 "Content-Length: 0\r\n" 535 "\r\n"; 536 return pipe.ends[1]->write(SIMPLE_RESPONSE, strlen(SIMPLE_RESPONSE)); 537 }).then([&]() -> kj::Promise<void> { 538 return kj::NEVER_DONE; 539 }); 540 541 HttpHeaderTable table; 542 auto client = newHttpClient(table, *pipe.ends[0]); 543 544 HttpHeaders headers(table); 545 for (auto& header: testCase.requestHeaders) { 546 headers.set(header.id, header.value); 547 } 548 549 auto request = client->request(testCase.method, testCase.path, headers, testCase.requestBodySize); 550 if (testCase.requestBodyParts.size() > 0) { 551 writeEach(*request.body, testCase.requestBodyParts).wait(waitScope); 552 } 553 request.body = nullptr; 554 auto clientTask = request.response 555 .then([&](HttpClient::Response&& response) { 556 auto promise = response.body->readAllText(); 557 return promise.attach(kj::mv(response.body)); 558 }).ignoreResult(); 559 560 serverTask.exclusiveJoin(kj::mv(clientTask)).wait(waitScope); 561 562 // Verify no more data written by client. 563 client = nullptr; 564 pipe.ends[0]->shutdownWrite(); 565 KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == ""); 566 } 567 568 void testHttpClientResponse(kj::WaitScope& waitScope, const HttpResponseTestCase& testCase, 569 size_t readFragmentSize, kj::TwoWayPipe pipe) { 570 ReadFragmenter fragmenter(*pipe.ends[0], readFragmentSize); 571 572 auto expectedReqText = testCase.method == HttpMethod::GET || testCase.method == HttpMethod::HEAD 573 ? kj::str(testCase.method, " / HTTP/1.1\r\n\r\n") 574 : kj::str(testCase.method, " / HTTP/1.1\r\nContent-Length: 0\r\n"); 575 576 auto serverTask = expectRead(*pipe.ends[1], expectedReqText).then([&]() { 577 return pipe.ends[1]->write(testCase.raw.begin(), testCase.raw.size()); 578 }).then([&]() -> kj::Promise<void> { 579 pipe.ends[1]->shutdownWrite(); 580 return kj::NEVER_DONE; 581 }); 582 583 HttpHeaderTable table; 584 auto client = newHttpClient(table, fragmenter); 585 586 HttpHeaders headers(table); 587 auto request = client->request(testCase.method, "/", headers, uint64_t(0)); 588 request.body = nullptr; 589 auto clientTask = request.response 590 .then([&](HttpClient::Response&& response) { 591 KJ_EXPECT(response.statusCode == testCase.statusCode); 592 KJ_EXPECT(response.statusText == testCase.statusText); 593 594 for (auto& header: testCase.responseHeaders) { 595 KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(header.id)) == header.value); 596 } 597 auto promise = response.body->readAllText(); 598 return promise.attach(kj::mv(response.body)); 599 }).then([&](kj::String body) { 600 KJ_EXPECT(body == kj::strArray(testCase.responseBodyParts, ""), body); 601 }); 602 603 serverTask.exclusiveJoin(kj::mv(clientTask)).wait(waitScope); 604 605 // Verify no more data written by client. 606 client = nullptr; 607 pipe.ends[0]->shutdownWrite(); 608 KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == ""); 609 } 610 611 void testHttpClient(kj::WaitScope& waitScope, HttpHeaderTable& table, 612 HttpClient& client, const HttpTestCase& testCase) { 613 KJ_CONTEXT(testCase.request.raw, testCase.response.raw); 614 615 HttpHeaders headers(table); 616 for (auto& header: testCase.request.requestHeaders) { 617 headers.set(header.id, header.value); 618 } 619 620 auto request = client.request( 621 testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize); 622 for (auto& part: testCase.request.requestBodyParts) { 623 request.body->write(part.begin(), part.size()).wait(waitScope); 624 } 625 request.body = nullptr; 626 627 auto response = request.response.wait(waitScope); 628 629 KJ_EXPECT(response.statusCode == testCase.response.statusCode); 630 auto body = response.body->readAllText().wait(waitScope); 631 if (testCase.request.method == HttpMethod::HEAD) { 632 KJ_EXPECT(body == ""); 633 } else { 634 KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body); 635 } 636 } 637 638 class TestHttpService final: public HttpService { 639 public: 640 TestHttpService(const HttpRequestTestCase& expectedRequest, 641 const HttpResponseTestCase& response, 642 HttpHeaderTable& table) 643 : singleExpectedRequest(&expectedRequest), 644 singleResponse(&response), 645 responseHeaders(table) {} 646 TestHttpService(kj::ArrayPtr<const HttpTestCase> testCases, 647 HttpHeaderTable& table) 648 : singleExpectedRequest(nullptr), 649 singleResponse(nullptr), 650 testCases(testCases), 651 responseHeaders(table) {} 652 653 uint getRequestCount() { return requestCount; } 654 655 kj::Promise<void> request( 656 HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, 657 kj::AsyncInputStream& requestBody, Response& responseSender) override { 658 auto& expectedRequest = testCases == nullptr ? *singleExpectedRequest : 659 testCases[requestCount % testCases.size()].request; 660 auto& response = testCases == nullptr ? *singleResponse : 661 testCases[requestCount % testCases.size()].response; 662 663 ++requestCount; 664 665 KJ_EXPECT(method == expectedRequest.method, method); 666 KJ_EXPECT(url == expectedRequest.path, url); 667 668 for (auto& header: expectedRequest.requestHeaders) { 669 KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(header.id)) == header.value); 670 } 671 672 auto size = requestBody.tryGetLength(); 673 KJ_IF_MAYBE(expectedSize, expectedRequest.requestBodySize) { 674 KJ_IF_MAYBE(s, size) { 675 KJ_EXPECT(*s == *expectedSize, *s); 676 } else { 677 KJ_FAIL_EXPECT("tryGetLength() returned nullptr; expected known size"); 678 } 679 } else { 680 KJ_EXPECT(size == nullptr); 681 } 682 683 return requestBody.readAllText() 684 .then([this,&expectedRequest,&response,&responseSender](kj::String text) { 685 KJ_EXPECT(text == kj::strArray(expectedRequest.requestBodyParts, ""), text); 686 687 responseHeaders.clear(); 688 for (auto& header: response.responseHeaders) { 689 responseHeaders.set(header.id, header.value); 690 } 691 692 auto stream = responseSender.send(response.statusCode, response.statusText, 693 responseHeaders, response.responseBodySize); 694 auto promise = writeEach(*stream, response.responseBodyParts); 695 return promise.attach(kj::mv(stream)); 696 }); 697 } 698 699 private: 700 const HttpRequestTestCase* singleExpectedRequest; 701 const HttpResponseTestCase* singleResponse; 702 kj::ArrayPtr<const HttpTestCase> testCases; 703 HttpHeaders responseHeaders; 704 uint requestCount = 0; 705 }; 706 707 void testHttpServerRequest(kj::WaitScope& waitScope, kj::Timer& timer, 708 const HttpRequestTestCase& requestCase, 709 const HttpResponseTestCase& responseCase, 710 kj::TwoWayPipe pipe) { 711 HttpHeaderTable table; 712 TestHttpService service(requestCase, responseCase, table); 713 HttpServer server(timer, table, service); 714 715 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 716 717 pipe.ends[1]->write(requestCase.raw.begin(), requestCase.raw.size()).wait(waitScope); 718 pipe.ends[1]->shutdownWrite(); 719 720 expectRead(*pipe.ends[1], responseCase.raw).wait(waitScope); 721 722 listenTask.wait(waitScope); 723 724 KJ_EXPECT(service.getRequestCount() == 1); 725 } 726 727 kj::ArrayPtr<const HttpRequestTestCase> requestTestCases() { 728 static const auto HUGE_STRING = kj::strArray(kj::repeat("abcdefgh", 4096), ""); 729 static const auto HUGE_REQUEST = kj::str( 730 "GET / HTTP/1.1\r\n" 731 "Host: ", HUGE_STRING, "\r\n" 732 "\r\n"); 733 734 static const HttpRequestTestCase REQUEST_TEST_CASES[] { 735 { 736 "GET /foo/bar HTTP/1.1\r\n" 737 "Host: example.com\r\n" 738 "\r\n", 739 740 HttpMethod::GET, 741 "/foo/bar", 742 {{HttpHeaderId::HOST, "example.com"}}, 743 uint64_t(0), {}, 744 }, 745 746 { 747 "HEAD /foo/bar HTTP/1.1\r\n" 748 "Host: example.com\r\n" 749 "\r\n", 750 751 HttpMethod::HEAD, 752 "/foo/bar", 753 {{HttpHeaderId::HOST, "example.com"}}, 754 uint64_t(0), {}, 755 }, 756 757 { 758 "POST / HTTP/1.1\r\n" 759 "Content-Length: 9\r\n" 760 "Host: example.com\r\n" 761 "Content-Type: text/plain\r\n" 762 "\r\n" 763 "foobarbaz", 764 765 HttpMethod::POST, 766 "/", 767 { 768 {HttpHeaderId::HOST, "example.com"}, 769 {HttpHeaderId::CONTENT_TYPE, "text/plain"}, 770 }, 771 9, { "foo", "bar", "baz" }, 772 }, 773 774 { 775 "POST / HTTP/1.1\r\n" 776 "Transfer-Encoding: chunked\r\n" 777 "Host: example.com\r\n" 778 "Content-Type: text/plain\r\n" 779 "\r\n" 780 "3\r\n" 781 "foo\r\n" 782 "6\r\n" 783 "barbaz\r\n" 784 "0\r\n" 785 "\r\n", 786 787 HttpMethod::POST, 788 "/", 789 { 790 {HttpHeaderId::HOST, "example.com"}, 791 {HttpHeaderId::CONTENT_TYPE, "text/plain"}, 792 }, 793 nullptr, { "foo", "barbaz" }, 794 }, 795 796 { 797 "POST / HTTP/1.1\r\n" 798 "Transfer-Encoding: chunked\r\n" 799 "Host: example.com\r\n" 800 "Content-Type: text/plain\r\n" 801 "\r\n" 802 "1d\r\n" 803 "0123456789abcdef0123456789abc\r\n" 804 "0\r\n" 805 "\r\n", 806 807 HttpMethod::POST, 808 "/", 809 { 810 {HttpHeaderId::HOST, "example.com"}, 811 {HttpHeaderId::CONTENT_TYPE, "text/plain"}, 812 }, 813 nullptr, { "0123456789abcdef0123456789abc" }, 814 }, 815 816 { 817 HUGE_REQUEST, 818 819 HttpMethod::GET, 820 "/", 821 {{HttpHeaderId::HOST, HUGE_STRING}}, 822 uint64_t(0), {} 823 }, 824 825 { 826 "GET /foo/bar HTTP/1.1\r\n" 827 "Content-Length: 6\r\n" 828 "Host: example.com\r\n" 829 "\r\n" 830 "foobar", 831 832 HttpMethod::GET, 833 "/foo/bar", 834 {{HttpHeaderId::HOST, "example.com"}}, 835 uint64_t(6), { "foobar" }, 836 }, 837 838 { 839 "GET /foo/bar HTTP/1.1\r\n" 840 "Transfer-Encoding: chunked\r\n" 841 "Host: example.com\r\n" 842 "\r\n" 843 "3\r\n" 844 "foo\r\n" 845 "3\r\n" 846 "bar\r\n" 847 "0\r\n" 848 "\r\n", 849 850 HttpMethod::GET, 851 "/foo/bar", 852 {{HttpHeaderId::HOST, "example.com"}, 853 {HttpHeaderId::TRANSFER_ENCODING, "chunked"}}, 854 nullptr, { "foo", "bar" }, 855 } 856 }; 857 858 // TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents REQUEST_TEST_CASES from implicitly 859 // casting to our return type. 860 return kj::arrayPtr(REQUEST_TEST_CASES, kj::size(REQUEST_TEST_CASES)); 861 } 862 863 kj::ArrayPtr<const HttpResponseTestCase> responseTestCases() { 864 static const HttpResponseTestCase RESPONSE_TEST_CASES[] { 865 { 866 "HTTP/1.1 200 OK\r\n" 867 "Content-Type: text/plain\r\n" 868 "Connection: close\r\n" 869 "\r\n" 870 "baz qux", 871 872 200, "OK", 873 {{HttpHeaderId::CONTENT_TYPE, "text/plain"}}, 874 nullptr, {"baz qux"}, 875 876 HttpMethod::GET, 877 CLIENT_ONLY, // Server never sends connection: close 878 }, 879 880 { 881 "HTTP/1.1 200 OK\r\n" 882 "Content-Type: text/plain\r\n" 883 "Transfer-Encoding: identity\r\n" 884 "Content-Length: foobar\r\n" // intentionally wrong 885 "\r\n" 886 "baz qux", 887 888 200, "OK", 889 {{HttpHeaderId::CONTENT_TYPE, "text/plain"}}, 890 nullptr, {"baz qux"}, 891 892 HttpMethod::GET, 893 CLIENT_ONLY, // Server never sends transfer-encoding: identity 894 }, 895 896 { 897 "HTTP/1.1 200 OK\r\n" 898 "Content-Type: text/plain\r\n" 899 "\r\n" 900 "baz qux", 901 902 200, "OK", 903 {{HttpHeaderId::CONTENT_TYPE, "text/plain"}}, 904 nullptr, {"baz qux"}, 905 906 HttpMethod::GET, 907 CLIENT_ONLY, // Server never sends non-delimited message 908 }, 909 910 { 911 "HTTP/1.1 200 OK\r\n" 912 "Content-Length: 123\r\n" 913 "Content-Type: text/plain\r\n" 914 "\r\n", 915 916 200, "OK", 917 {{HttpHeaderId::CONTENT_TYPE, "text/plain"}}, 918 123, {}, 919 920 HttpMethod::HEAD, 921 }, 922 923 { 924 "HTTP/1.1 200 OK\r\n" 925 "Content-Length: foobar\r\n" 926 "Content-Type: text/plain\r\n" 927 "\r\n", 928 929 200, "OK", 930 {{HttpHeaderId::CONTENT_TYPE, "text/plain"}, 931 {HttpHeaderId::CONTENT_LENGTH, "foobar"}}, 932 123, {}, 933 934 HttpMethod::HEAD, 935 }, 936 937 // Zero-length expected size response to HEAD request has no Content-Length header. 938 { 939 "HTTP/1.1 200 OK\r\n" 940 "\r\n", 941 942 200, "OK", 943 {}, 944 uint64_t(0), {}, 945 946 HttpMethod::HEAD, 947 }, 948 949 { 950 "HTTP/1.1 204 No Content\r\n" 951 "\r\n", 952 953 204, "No Content", 954 {}, 955 uint64_t(0), {}, 956 }, 957 958 { 959 "HTTP/1.1 205 Reset Content\r\n" 960 "Content-Length: 0\r\n" 961 "\r\n", 962 963 205, "Reset Content", 964 {}, 965 uint64_t(0), {}, 966 }, 967 968 { 969 "HTTP/1.1 304 Not Modified\r\n" 970 "\r\n", 971 972 304, "Not Modified", 973 {}, 974 uint64_t(0), {}, 975 }, 976 977 { 978 "HTTP/1.1 200 OK\r\n" 979 "Content-Length: 8\r\n" 980 "Content-Type: text/plain\r\n" 981 "\r\n" 982 "quxcorge", 983 984 200, "OK", 985 {{HttpHeaderId::CONTENT_TYPE, "text/plain"}}, 986 8, { "qux", "corge" } 987 }, 988 989 { 990 "HTTP/1.1 200 OK\r\n" 991 "Transfer-Encoding: chunked\r\n" 992 "Content-Type: text/plain\r\n" 993 "\r\n" 994 "3\r\n" 995 "qux\r\n" 996 "5\r\n" 997 "corge\r\n" 998 "0\r\n" 999 "\r\n", 1000 1001 200, "OK", 1002 {{HttpHeaderId::CONTENT_TYPE, "text/plain"}}, 1003 nullptr, { "qux", "corge" } 1004 }, 1005 }; 1006 1007 // TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents RESPONSE_TEST_CASES from implicitly 1008 // casting to our return type. 1009 return kj::arrayPtr(RESPONSE_TEST_CASES, kj::size(RESPONSE_TEST_CASES)); 1010 } 1011 1012 KJ_TEST("HttpClient requests") { 1013 KJ_HTTP_TEST_SETUP_IO; 1014 1015 for (auto& testCase: requestTestCases()) { 1016 if (testCase.side == SERVER_ONLY) continue; 1017 KJ_CONTEXT(testCase.raw); 1018 testHttpClientRequest(waitScope, testCase, KJ_HTTP_TEST_CREATE_2PIPE); 1019 } 1020 } 1021 1022 KJ_TEST("HttpClient responses") { 1023 KJ_HTTP_TEST_SETUP_IO; 1024 size_t FRAGMENT_SIZES[] = { 1, 2, 3, 4, 5, 6, 7, 8, 16, 31, kj::maxValue }; 1025 1026 for (auto& testCase: responseTestCases()) { 1027 if (testCase.side == SERVER_ONLY) continue; 1028 for (size_t fragmentSize: FRAGMENT_SIZES) { 1029 KJ_CONTEXT(testCase.raw, fragmentSize); 1030 testHttpClientResponse(waitScope, testCase, fragmentSize, KJ_HTTP_TEST_CREATE_2PIPE); 1031 } 1032 } 1033 } 1034 1035 KJ_TEST("HttpClient canceled write") { 1036 KJ_HTTP_TEST_SETUP_IO; 1037 1038 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1039 1040 auto serverPromise = pipe.ends[1]->readAllText(); 1041 1042 { 1043 HttpHeaderTable table; 1044 auto client = newHttpClient(table, *pipe.ends[0]); 1045 1046 auto body = kj::heapArray<byte>(4096); 1047 memset(body.begin(), 0xcf, body.size()); 1048 1049 auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table), uint64_t(4096)); 1050 1051 // Note: This poll() forces the server to receive what was written so far. Otherwise, 1052 // cancelling the write below may in fact cancel the previous write as well. 1053 KJ_EXPECT(!serverPromise.poll(waitScope)); 1054 1055 // Start a write and immediately cancel it. 1056 { 1057 auto ignore KJ_UNUSED = req.body->write(body.begin(), body.size()); 1058 } 1059 1060 KJ_EXPECT_THROW_MESSAGE("overwrote", req.body->write("foo", 3).wait(waitScope)); 1061 req.body = nullptr; 1062 1063 KJ_EXPECT(!serverPromise.poll(waitScope)); 1064 1065 KJ_EXPECT_THROW_MESSAGE("can't start new request until previous request body", 1066 client->request(HttpMethod::GET, "/", HttpHeaders(table)).response.wait(waitScope)); 1067 } 1068 1069 pipe.ends[0]->shutdownWrite(); 1070 auto text = serverPromise.wait(waitScope); 1071 KJ_EXPECT(text == "POST / HTTP/1.1\r\nContent-Length: 4096\r\n\r\n", text); 1072 } 1073 1074 KJ_TEST("HttpClient chunked body gather-write") { 1075 KJ_HTTP_TEST_SETUP_IO; 1076 1077 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1078 1079 auto serverPromise = pipe.ends[1]->readAllText(); 1080 1081 { 1082 HttpHeaderTable table; 1083 auto client = newHttpClient(table, *pipe.ends[0]); 1084 1085 auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table)); 1086 1087 kj::ArrayPtr<const byte> bodyParts[] = { 1088 "foo"_kj.asBytes(), " "_kj.asBytes(), 1089 "bar"_kj.asBytes(), " "_kj.asBytes(), 1090 "baz"_kj.asBytes() 1091 }; 1092 1093 req.body->write(kj::arrayPtr(bodyParts, kj::size(bodyParts))).wait(waitScope); 1094 req.body = nullptr; 1095 1096 // Wait for a response so the client has a chance to end the request body with a 0-chunk. 1097 kj::StringPtr responseText = "HTTP/1.1 204 No Content\r\n\r\n"; 1098 pipe.ends[1]->write(responseText.begin(), responseText.size()).wait(waitScope); 1099 auto response = req.response.wait(waitScope); 1100 } 1101 1102 pipe.ends[0]->shutdownWrite(); 1103 1104 auto text = serverPromise.wait(waitScope); 1105 KJ_EXPECT(text == "POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n" 1106 "b\r\nfoo bar baz\r\n0\r\n\r\n", text); 1107 } 1108 1109 KJ_TEST("HttpClient chunked body pump from fixed length stream") { 1110 class FixedBodyStream final: public kj::AsyncInputStream { 1111 Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { 1112 auto n = kj::min(body.size(), maxBytes); 1113 n = kj::max(n, minBytes); 1114 n = kj::min(n, body.size()); 1115 memcpy(buffer, body.begin(), n); 1116 body = body.slice(n); 1117 return n; 1118 } 1119 1120 Maybe<uint64_t> tryGetLength() override { return body.size(); } 1121 1122 kj::StringPtr body = "foo bar baz"; 1123 }; 1124 1125 KJ_HTTP_TEST_SETUP_IO; 1126 1127 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1128 1129 auto serverPromise = pipe.ends[1]->readAllText(); 1130 1131 { 1132 HttpHeaderTable table; 1133 auto client = newHttpClient(table, *pipe.ends[0]); 1134 1135 auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table)); 1136 1137 FixedBodyStream bodyStream; 1138 bodyStream.pumpTo(*req.body).wait(waitScope); 1139 req.body = nullptr; 1140 1141 // Wait for a response so the client has a chance to end the request body with a 0-chunk. 1142 kj::StringPtr responseText = "HTTP/1.1 204 No Content\r\n\r\n"; 1143 pipe.ends[1]->write(responseText.begin(), responseText.size()).wait(waitScope); 1144 auto response = req.response.wait(waitScope); 1145 } 1146 1147 pipe.ends[0]->shutdownWrite(); 1148 1149 auto text = serverPromise.wait(waitScope); 1150 KJ_EXPECT(text == "POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n" 1151 "b\r\nfoo bar baz\r\n0\r\n\r\n", text); 1152 } 1153 1154 KJ_TEST("HttpServer requests") { 1155 HttpResponseTestCase RESPONSE = { 1156 "HTTP/1.1 200 OK\r\n" 1157 "Content-Length: 3\r\n" 1158 "\r\n" 1159 "foo", 1160 1161 200, "OK", 1162 {}, 1163 3, {"foo"} 1164 }; 1165 1166 HttpResponseTestCase HEAD_RESPONSE = { 1167 "HTTP/1.1 200 OK\r\n" 1168 "Content-Length: 3\r\n" 1169 "\r\n", 1170 1171 200, "OK", 1172 {}, 1173 3, {"foo"} 1174 }; 1175 1176 KJ_HTTP_TEST_SETUP_IO; 1177 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 1178 1179 for (auto& testCase: requestTestCases()) { 1180 if (testCase.side == CLIENT_ONLY) continue; 1181 KJ_CONTEXT(testCase.raw); 1182 testHttpServerRequest(waitScope, timer, testCase, 1183 testCase.method == HttpMethod::HEAD ? HEAD_RESPONSE : RESPONSE, 1184 KJ_HTTP_TEST_CREATE_2PIPE); 1185 } 1186 } 1187 1188 KJ_TEST("HttpServer responses") { 1189 HttpRequestTestCase REQUEST = { 1190 "GET / HTTP/1.1\r\n" 1191 "\r\n", 1192 1193 HttpMethod::GET, 1194 "/", 1195 {}, 1196 uint64_t(0), {}, 1197 }; 1198 1199 HttpRequestTestCase HEAD_REQUEST = { 1200 "HEAD / HTTP/1.1\r\n" 1201 "\r\n", 1202 1203 HttpMethod::HEAD, 1204 "/", 1205 {}, 1206 uint64_t(0), {}, 1207 }; 1208 1209 KJ_HTTP_TEST_SETUP_IO; 1210 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 1211 1212 for (auto& testCase: responseTestCases()) { 1213 if (testCase.side == CLIENT_ONLY) continue; 1214 KJ_CONTEXT(testCase.raw); 1215 testHttpServerRequest(waitScope, timer, 1216 testCase.method == HttpMethod::HEAD ? HEAD_REQUEST : REQUEST, testCase, 1217 KJ_HTTP_TEST_CREATE_2PIPE); 1218 } 1219 } 1220 1221 // ----------------------------------------------------------------------------- 1222 1223 kj::ArrayPtr<const HttpTestCase> pipelineTestCases() { 1224 static const HttpTestCase PIPELINE_TESTS[] = { 1225 { 1226 { 1227 "GET / HTTP/1.1\r\n" 1228 "\r\n", 1229 1230 HttpMethod::GET, "/", {}, uint64_t(0), {}, 1231 }, 1232 { 1233 "HTTP/1.1 200 OK\r\n" 1234 "Content-Length: 7\r\n" 1235 "\r\n" 1236 "foo bar", 1237 1238 200, "OK", {}, 7, { "foo bar" } 1239 }, 1240 }, 1241 1242 { 1243 { 1244 "POST /foo HTTP/1.1\r\n" 1245 "Content-Length: 6\r\n" 1246 "\r\n" 1247 "grault", 1248 1249 HttpMethod::POST, "/foo", {}, 6, { "grault" }, 1250 }, 1251 { 1252 "HTTP/1.1 404 Not Found\r\n" 1253 "Content-Length: 13\r\n" 1254 "\r\n" 1255 "baz qux corge", 1256 1257 404, "Not Found", {}, 13, { "baz qux corge" } 1258 }, 1259 }, 1260 1261 // Throw a zero-size request/response into the pipeline to check for a bug that existed with 1262 // them previously. 1263 { 1264 { 1265 "POST /foo HTTP/1.1\r\n" 1266 "Content-Length: 0\r\n" 1267 "\r\n", 1268 1269 HttpMethod::POST, "/foo", {}, uint64_t(0), {}, 1270 }, 1271 { 1272 "HTTP/1.1 200 OK\r\n" 1273 "Content-Length: 0\r\n" 1274 "\r\n", 1275 1276 200, "OK", {}, uint64_t(0), {} 1277 }, 1278 }, 1279 1280 // Also a zero-size chunked request/response. 1281 { 1282 { 1283 "POST /foo HTTP/1.1\r\n" 1284 "Transfer-Encoding: chunked\r\n" 1285 "\r\n" 1286 "0\r\n" 1287 "\r\n", 1288 1289 HttpMethod::POST, "/foo", {}, nullptr, {}, 1290 }, 1291 { 1292 "HTTP/1.1 200 OK\r\n" 1293 "Transfer-Encoding: chunked\r\n" 1294 "\r\n" 1295 "0\r\n" 1296 "\r\n", 1297 1298 200, "OK", {}, nullptr, {} 1299 }, 1300 }, 1301 1302 { 1303 { 1304 "POST /bar HTTP/1.1\r\n" 1305 "Transfer-Encoding: chunked\r\n" 1306 "\r\n" 1307 "6\r\n" 1308 "garply\r\n" 1309 "5\r\n" 1310 "waldo\r\n" 1311 "0\r\n" 1312 "\r\n", 1313 1314 HttpMethod::POST, "/bar", {}, nullptr, { "garply", "waldo" }, 1315 }, 1316 { 1317 "HTTP/1.1 200 OK\r\n" 1318 "Transfer-Encoding: chunked\r\n" 1319 "\r\n" 1320 "4\r\n" 1321 "fred\r\n" 1322 "5\r\n" 1323 "plugh\r\n" 1324 "0\r\n" 1325 "\r\n", 1326 1327 200, "OK", {}, nullptr, { "fred", "plugh" } 1328 }, 1329 }, 1330 1331 { 1332 { 1333 "HEAD / HTTP/1.1\r\n" 1334 "\r\n", 1335 1336 HttpMethod::HEAD, "/", {}, uint64_t(0), {}, 1337 }, 1338 { 1339 "HTTP/1.1 200 OK\r\n" 1340 "Content-Length: 7\r\n" 1341 "\r\n", 1342 1343 200, "OK", {}, 7, { "foo bar" } 1344 }, 1345 }, 1346 1347 // Zero-length expected size response to HEAD request has no Content-Length header. 1348 { 1349 { 1350 "HEAD / HTTP/1.1\r\n" 1351 "\r\n", 1352 1353 HttpMethod::HEAD, "/", {}, uint64_t(0), {}, 1354 }, 1355 { 1356 "HTTP/1.1 200 OK\r\n" 1357 "\r\n", 1358 1359 200, "OK", {}, uint64_t(0), {}, HttpMethod::HEAD, 1360 }, 1361 }, 1362 }; 1363 1364 // TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents RESPONSE_TEST_CASES from implicitly 1365 // casting to our return type. 1366 return kj::arrayPtr(PIPELINE_TESTS, kj::size(PIPELINE_TESTS)); 1367 } 1368 1369 KJ_TEST("HttpClient pipeline") { 1370 auto PIPELINE_TESTS = pipelineTestCases(); 1371 1372 KJ_HTTP_TEST_SETUP_IO; 1373 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1374 1375 kj::Promise<void> writeResponsesPromise = kj::READY_NOW; 1376 for (auto& testCase: PIPELINE_TESTS) { 1377 writeResponsesPromise = writeResponsesPromise 1378 .then([&]() { 1379 return expectRead(*pipe.ends[1], testCase.request.raw); 1380 }).then([&]() { 1381 return pipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size()); 1382 }); 1383 } 1384 1385 HttpHeaderTable table; 1386 auto client = newHttpClient(table, *pipe.ends[0]); 1387 1388 for (auto& testCase: PIPELINE_TESTS) { 1389 testHttpClient(waitScope, table, *client, testCase); 1390 } 1391 1392 client = nullptr; 1393 pipe.ends[0]->shutdownWrite(); 1394 1395 writeResponsesPromise.wait(waitScope); 1396 } 1397 1398 KJ_TEST("HttpClient parallel pipeline") { 1399 auto PIPELINE_TESTS = pipelineTestCases(); 1400 1401 KJ_HTTP_TEST_SETUP_IO; 1402 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1403 1404 kj::Promise<void> readRequestsPromise = kj::READY_NOW; 1405 kj::Promise<void> writeResponsesPromise = kj::READY_NOW; 1406 for (auto& testCase: PIPELINE_TESTS) { 1407 auto forked = readRequestsPromise 1408 .then([&]() { 1409 return expectRead(*pipe.ends[1], testCase.request.raw); 1410 }).fork(); 1411 readRequestsPromise = forked.addBranch(); 1412 1413 // Don't write each response until the corresponding request is received. 1414 auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2); 1415 promises.add(forked.addBranch()); 1416 promises.add(kj::mv(writeResponsesPromise)); 1417 writeResponsesPromise = kj::joinPromises(promises.finish()).then([&]() { 1418 return pipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size()); 1419 }); 1420 } 1421 1422 HttpHeaderTable table; 1423 auto client = newHttpClient(table, *pipe.ends[0]); 1424 1425 auto responsePromises = KJ_MAP(testCase, PIPELINE_TESTS) { 1426 KJ_CONTEXT(testCase.request.raw, testCase.response.raw); 1427 1428 HttpHeaders headers(table); 1429 for (auto& header: testCase.request.requestHeaders) { 1430 headers.set(header.id, header.value); 1431 } 1432 1433 auto request = client->request( 1434 testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize); 1435 for (auto& part: testCase.request.requestBodyParts) { 1436 request.body->write(part.begin(), part.size()).wait(waitScope); 1437 } 1438 1439 return kj::mv(request.response); 1440 }; 1441 1442 for (auto i: kj::indices(PIPELINE_TESTS)) { 1443 auto& testCase = PIPELINE_TESTS[i]; 1444 auto response = responsePromises[i].wait(waitScope); 1445 1446 KJ_EXPECT(response.statusCode == testCase.response.statusCode); 1447 auto body = response.body->readAllText().wait(waitScope); 1448 if (testCase.request.method == HttpMethod::HEAD) { 1449 KJ_EXPECT(body == ""); 1450 } else { 1451 KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body); 1452 } 1453 } 1454 1455 client = nullptr; 1456 pipe.ends[0]->shutdownWrite(); 1457 1458 writeResponsesPromise.wait(waitScope); 1459 } 1460 1461 KJ_TEST("HttpServer pipeline") { 1462 auto PIPELINE_TESTS = pipelineTestCases(); 1463 1464 KJ_HTTP_TEST_SETUP_IO; 1465 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 1466 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1467 1468 HttpHeaderTable table; 1469 TestHttpService service(PIPELINE_TESTS, table); 1470 HttpServer server(timer, table, service); 1471 1472 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 1473 1474 for (auto& testCase: PIPELINE_TESTS) { 1475 KJ_CONTEXT(testCase.request.raw, testCase.response.raw); 1476 1477 pipe.ends[1]->write(testCase.request.raw.begin(), testCase.request.raw.size()) 1478 .wait(waitScope); 1479 1480 expectRead(*pipe.ends[1], testCase.response.raw).wait(waitScope); 1481 } 1482 1483 pipe.ends[1]->shutdownWrite(); 1484 listenTask.wait(waitScope); 1485 1486 KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS)); 1487 } 1488 1489 KJ_TEST("HttpServer parallel pipeline") { 1490 auto PIPELINE_TESTS = pipelineTestCases(); 1491 1492 KJ_HTTP_TEST_SETUP_IO; 1493 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 1494 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1495 1496 auto allRequestText = 1497 kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.request.raw; }, ""); 1498 auto allResponseText = 1499 kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.response.raw; }, ""); 1500 1501 HttpHeaderTable table; 1502 TestHttpService service(PIPELINE_TESTS, table); 1503 HttpServer server(timer, table, service); 1504 1505 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 1506 1507 pipe.ends[1]->write(allRequestText.begin(), allRequestText.size()).wait(waitScope); 1508 pipe.ends[1]->shutdownWrite(); 1509 1510 auto rawResponse = pipe.ends[1]->readAllText().wait(waitScope); 1511 KJ_EXPECT(rawResponse == allResponseText, rawResponse); 1512 1513 listenTask.wait(waitScope); 1514 1515 KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS)); 1516 } 1517 1518 KJ_TEST("HttpClient <-> HttpServer") { 1519 auto PIPELINE_TESTS = pipelineTestCases(); 1520 1521 KJ_HTTP_TEST_SETUP_IO; 1522 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 1523 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1524 1525 HttpHeaderTable table; 1526 TestHttpService service(PIPELINE_TESTS, table); 1527 HttpServer server(timer, table, service); 1528 1529 auto listenTask = server.listenHttp(kj::mv(pipe.ends[1])); 1530 auto client = newHttpClient(table, *pipe.ends[0]); 1531 1532 for (auto& testCase: PIPELINE_TESTS) { 1533 testHttpClient(waitScope, table, *client, testCase); 1534 } 1535 1536 client = nullptr; 1537 pipe.ends[0]->shutdownWrite(); 1538 listenTask.wait(waitScope); 1539 KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS)); 1540 } 1541 1542 // ----------------------------------------------------------------------------- 1543 1544 KJ_TEST("HttpInputStream requests") { 1545 KJ_HTTP_TEST_SETUP_IO; 1546 1547 kj::HttpHeaderTable table; 1548 1549 auto pipe = kj::newOneWayPipe(); 1550 auto input = newHttpInputStream(*pipe.in, table); 1551 1552 kj::Promise<void> writeQueue = kj::READY_NOW; 1553 1554 for (auto& testCase: requestTestCases()) { 1555 writeQueue = writeQueue.then([&]() { 1556 return pipe.out->write(testCase.raw.begin(), testCase.raw.size()); 1557 }); 1558 } 1559 writeQueue = writeQueue.then([&]() { 1560 pipe.out = nullptr; 1561 }); 1562 1563 for (auto& testCase: requestTestCases()) { 1564 KJ_CONTEXT(testCase.raw); 1565 1566 KJ_ASSERT(input->awaitNextMessage().wait(waitScope)); 1567 1568 auto req = input->readRequest().wait(waitScope); 1569 KJ_EXPECT(req.method == testCase.method); 1570 KJ_EXPECT(req.url == testCase.path); 1571 for (auto& header: testCase.requestHeaders) { 1572 KJ_EXPECT(KJ_ASSERT_NONNULL(req.headers.get(header.id)) == header.value); 1573 } 1574 auto body = req.body->readAllText().wait(waitScope); 1575 KJ_EXPECT(body == kj::strArray(testCase.requestBodyParts, "")); 1576 } 1577 1578 writeQueue.wait(waitScope); 1579 KJ_EXPECT(!input->awaitNextMessage().wait(waitScope)); 1580 } 1581 1582 KJ_TEST("HttpInputStream responses") { 1583 KJ_HTTP_TEST_SETUP_IO; 1584 1585 kj::HttpHeaderTable table; 1586 1587 auto pipe = kj::newOneWayPipe(); 1588 auto input = newHttpInputStream(*pipe.in, table); 1589 1590 kj::Promise<void> writeQueue = kj::READY_NOW; 1591 1592 for (auto& testCase: responseTestCases()) { 1593 if (testCase.side == CLIENT_ONLY) continue; // skip Connection: close case. 1594 writeQueue = writeQueue.then([&]() { 1595 return pipe.out->write(testCase.raw.begin(), testCase.raw.size()); 1596 }); 1597 } 1598 writeQueue = writeQueue.then([&]() { 1599 pipe.out = nullptr; 1600 }); 1601 1602 for (auto& testCase: responseTestCases()) { 1603 if (testCase.side == CLIENT_ONLY) continue; // skip Connection: close case. 1604 KJ_CONTEXT(testCase.raw); 1605 1606 KJ_ASSERT(input->awaitNextMessage().wait(waitScope)); 1607 1608 auto resp = input->readResponse(testCase.method).wait(waitScope); 1609 KJ_EXPECT(resp.statusCode == testCase.statusCode); 1610 KJ_EXPECT(resp.statusText == testCase.statusText); 1611 for (auto& header: testCase.responseHeaders) { 1612 KJ_EXPECT(KJ_ASSERT_NONNULL(resp.headers.get(header.id)) == header.value); 1613 } 1614 auto body = resp.body->readAllText().wait(waitScope); 1615 KJ_EXPECT(body == kj::strArray(testCase.responseBodyParts, "")); 1616 } 1617 1618 writeQueue.wait(waitScope); 1619 KJ_EXPECT(!input->awaitNextMessage().wait(waitScope)); 1620 } 1621 1622 KJ_TEST("HttpInputStream bare messages") { 1623 KJ_HTTP_TEST_SETUP_IO; 1624 1625 kj::HttpHeaderTable table; 1626 1627 auto pipe = kj::newOneWayPipe(); 1628 auto input = newHttpInputStream(*pipe.in, table); 1629 1630 kj::StringPtr messages = 1631 "Content-Length: 6\r\n" 1632 "\r\n" 1633 "foobar" 1634 "Content-Length: 11\r\n" 1635 "Content-Type: some/type\r\n" 1636 "\r\n" 1637 "bazquxcorge" 1638 "Transfer-Encoding: chunked\r\n" 1639 "\r\n" 1640 "6\r\n" 1641 "grault\r\n" 1642 "b\r\n" 1643 "garplywaldo\r\n" 1644 "0\r\n" 1645 "\r\n"_kj; 1646 1647 kj::Promise<void> writeTask = pipe.out->write(messages.begin(), messages.size()) 1648 .then([&]() { pipe.out = nullptr; }); 1649 1650 { 1651 KJ_ASSERT(input->awaitNextMessage().wait(waitScope)); 1652 auto message = input->readMessage().wait(waitScope); 1653 KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_LENGTH)) == "6"); 1654 KJ_EXPECT(message.body->readAllText().wait(waitScope) == "foobar"); 1655 } 1656 { 1657 KJ_ASSERT(input->awaitNextMessage().wait(waitScope)); 1658 auto message = input->readMessage().wait(waitScope); 1659 KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_LENGTH)) == "11"); 1660 KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_TYPE)) == "some/type"); 1661 KJ_EXPECT(message.body->readAllText().wait(waitScope) == "bazquxcorge"); 1662 } 1663 { 1664 KJ_ASSERT(input->awaitNextMessage().wait(waitScope)); 1665 auto message = input->readMessage().wait(waitScope); 1666 KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::TRANSFER_ENCODING)) == "chunked"); 1667 KJ_EXPECT(message.body->readAllText().wait(waitScope) == "graultgarplywaldo"); 1668 } 1669 1670 writeTask.wait(waitScope); 1671 KJ_EXPECT(!input->awaitNextMessage().wait(waitScope)); 1672 } 1673 1674 // ----------------------------------------------------------------------------- 1675 1676 KJ_TEST("WebSocket core protocol") { 1677 KJ_HTTP_TEST_SETUP_IO; 1678 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1679 1680 auto client = newWebSocket(kj::mv(pipe.ends[0]), nullptr); 1681 auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr); 1682 1683 auto mediumString = kj::strArray(kj::repeat(kj::StringPtr("123456789"), 30), ""); 1684 auto bigString = kj::strArray(kj::repeat(kj::StringPtr("123456789"), 10000), ""); 1685 1686 auto clientTask = client->send(kj::StringPtr("hello")) 1687 .then([&]() { return client->send(mediumString); }) 1688 .then([&]() { return client->send(bigString); }) 1689 .then([&]() { return client->send(kj::StringPtr("world").asBytes()); }) 1690 .then([&]() { return client->close(1234, "bored"); }) 1691 .then([&]() { KJ_EXPECT(client->sentByteCount() == 90307)}); 1692 1693 { 1694 auto message = server->receive().wait(waitScope); 1695 KJ_ASSERT(message.is<kj::String>()); 1696 KJ_EXPECT(message.get<kj::String>() == "hello"); 1697 } 1698 1699 { 1700 auto message = server->receive().wait(waitScope); 1701 KJ_ASSERT(message.is<kj::String>()); 1702 KJ_EXPECT(message.get<kj::String>() == mediumString); 1703 } 1704 1705 { 1706 auto message = server->receive().wait(waitScope); 1707 KJ_ASSERT(message.is<kj::String>()); 1708 KJ_EXPECT(message.get<kj::String>() == bigString); 1709 } 1710 1711 { 1712 auto message = server->receive().wait(waitScope); 1713 KJ_ASSERT(message.is<kj::Array<byte>>()); 1714 KJ_EXPECT(kj::str(message.get<kj::Array<byte>>().asChars()) == "world"); 1715 } 1716 1717 { 1718 auto message = server->receive().wait(waitScope); 1719 KJ_ASSERT(message.is<WebSocket::Close>()); 1720 KJ_EXPECT(message.get<WebSocket::Close>().code == 1234); 1721 KJ_EXPECT(message.get<WebSocket::Close>().reason == "bored"); 1722 KJ_EXPECT(server->receivedByteCount() == 90307); 1723 } 1724 1725 auto serverTask = server->close(4321, "whatever"); 1726 1727 { 1728 auto message = client->receive().wait(waitScope); 1729 KJ_ASSERT(message.is<WebSocket::Close>()); 1730 KJ_EXPECT(message.get<WebSocket::Close>().code == 4321); 1731 KJ_EXPECT(message.get<WebSocket::Close>().reason == "whatever"); 1732 KJ_EXPECT(client->receivedByteCount() == 12); 1733 } 1734 1735 clientTask.wait(waitScope); 1736 serverTask.wait(waitScope); 1737 } 1738 1739 KJ_TEST("WebSocket fragmented") { 1740 KJ_HTTP_TEST_SETUP_IO; 1741 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1742 1743 auto client = kj::mv(pipe.ends[0]); 1744 auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr); 1745 1746 byte DATA[] = { 1747 0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ', 1748 1749 0x00, 0x03, 'w', 'o', 'r', 1750 1751 0x80, 0x02, 'l', 'd', 1752 }; 1753 1754 auto clientTask = client->write(DATA, sizeof(DATA)); 1755 1756 { 1757 auto message = server->receive().wait(waitScope); 1758 KJ_ASSERT(message.is<kj::String>()); 1759 KJ_EXPECT(message.get<kj::String>() == "hello world"); 1760 } 1761 1762 clientTask.wait(waitScope); 1763 } 1764 1765 class FakeEntropySource final: public EntropySource { 1766 public: 1767 void generate(kj::ArrayPtr<byte> buffer) override { 1768 static constexpr byte DUMMY[4] = { 12, 34, 56, 78 }; 1769 1770 for (auto i: kj::indices(buffer)) { 1771 buffer[i] = DUMMY[i % sizeof(DUMMY)]; 1772 } 1773 } 1774 }; 1775 1776 KJ_TEST("WebSocket masked") { 1777 KJ_HTTP_TEST_SETUP_IO; 1778 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1779 FakeEntropySource maskGenerator; 1780 1781 auto client = kj::mv(pipe.ends[0]); 1782 auto server = newWebSocket(kj::mv(pipe.ends[1]), maskGenerator); 1783 1784 byte DATA[] = { 1785 0x81, 0x86, 12, 34, 56, 78, 'h' ^ 12, 'e' ^ 34, 'l' ^ 56, 'l' ^ 78, 'o' ^ 12, ' ' ^ 34, 1786 }; 1787 1788 auto clientTask = client->write(DATA, sizeof(DATA)); 1789 auto serverTask = server->send(kj::StringPtr("hello ")); 1790 1791 { 1792 auto message = server->receive().wait(waitScope); 1793 KJ_ASSERT(message.is<kj::String>()); 1794 KJ_EXPECT(message.get<kj::String>() == "hello "); 1795 } 1796 1797 expectRead(*client, DATA).wait(waitScope); 1798 1799 clientTask.wait(waitScope); 1800 serverTask.wait(waitScope); 1801 } 1802 1803 KJ_TEST("WebSocket unsolicited pong") { 1804 KJ_HTTP_TEST_SETUP_IO; 1805 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1806 1807 auto client = kj::mv(pipe.ends[0]); 1808 auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr); 1809 1810 byte DATA[] = { 1811 0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ', 1812 1813 0x8A, 0x03, 'f', 'o', 'o', 1814 1815 0x80, 0x05, 'w', 'o', 'r', 'l', 'd', 1816 }; 1817 1818 auto clientTask = client->write(DATA, sizeof(DATA)); 1819 1820 { 1821 auto message = server->receive().wait(waitScope); 1822 KJ_ASSERT(message.is<kj::String>()); 1823 KJ_EXPECT(message.get<kj::String>() == "hello world"); 1824 } 1825 1826 clientTask.wait(waitScope); 1827 } 1828 1829 KJ_TEST("WebSocket ping") { 1830 KJ_HTTP_TEST_SETUP_IO; 1831 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1832 1833 auto client = kj::mv(pipe.ends[0]); 1834 auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr); 1835 1836 // Be extra-annoying by having the ping arrive between fragments. 1837 byte DATA[] = { 1838 0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ', 1839 1840 0x89, 0x03, 'f', 'o', 'o', 1841 1842 0x80, 0x05, 'w', 'o', 'r', 'l', 'd', 1843 }; 1844 1845 auto clientTask = client->write(DATA, sizeof(DATA)); 1846 1847 { 1848 auto message = server->receive().wait(waitScope); 1849 KJ_ASSERT(message.is<kj::String>()); 1850 KJ_EXPECT(message.get<kj::String>() == "hello world"); 1851 } 1852 1853 auto serverTask = server->send(kj::StringPtr("bar")); 1854 1855 byte EXPECTED[] = { 1856 0x8A, 0x03, 'f', 'o', 'o', // pong 1857 0x81, 0x03, 'b', 'a', 'r', // message 1858 }; 1859 1860 expectRead(*client, EXPECTED).wait(waitScope); 1861 1862 clientTask.wait(waitScope); 1863 serverTask.wait(waitScope); 1864 } 1865 1866 KJ_TEST("WebSocket ping mid-send") { 1867 KJ_HTTP_TEST_SETUP_IO; 1868 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1869 1870 auto client = kj::mv(pipe.ends[0]); 1871 auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr); 1872 1873 auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), ""); 1874 auto serverTask = server->send(bigString).eagerlyEvaluate(nullptr); 1875 1876 byte DATA[] = { 1877 0x89, 0x03, 'f', 'o', 'o', // ping 1878 0x81, 0x03, 'b', 'a', 'r', // some other message 1879 }; 1880 1881 auto clientTask = client->write(DATA, sizeof(DATA)); 1882 1883 { 1884 auto message = server->receive().wait(waitScope); 1885 KJ_ASSERT(message.is<kj::String>()); 1886 KJ_EXPECT(message.get<kj::String>() == "bar"); 1887 } 1888 1889 byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 }; 1890 expectRead(*client, EXPECTED1).wait(waitScope); 1891 expectRead(*client, bigString).wait(waitScope); 1892 1893 byte EXPECTED2[] = { 0x8A, 0x03, 'f', 'o', 'o' }; 1894 expectRead(*client, EXPECTED2).wait(waitScope); 1895 1896 clientTask.wait(waitScope); 1897 serverTask.wait(waitScope); 1898 } 1899 1900 class InputOutputPair final: public kj::AsyncIoStream { 1901 // Creates an AsyncIoStream out of an AsyncInputStream and an AsyncOutputStream. 1902 1903 public: 1904 InputOutputPair(kj::Own<kj::AsyncInputStream> in, kj::Own<kj::AsyncOutputStream> out) 1905 : in(kj::mv(in)), out(kj::mv(out)) {} 1906 1907 kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override { 1908 return in->read(buffer, minBytes, maxBytes); 1909 } 1910 kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { 1911 return in->tryRead(buffer, minBytes, maxBytes); 1912 } 1913 1914 Maybe<uint64_t> tryGetLength() override { 1915 return in->tryGetLength(); 1916 } 1917 1918 Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount = kj::maxValue) override { 1919 return in->pumpTo(output, amount); 1920 } 1921 1922 kj::Promise<void> write(const void* buffer, size_t size) override { 1923 return out->write(buffer, size); 1924 } 1925 1926 kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override { 1927 return out->write(pieces); 1928 } 1929 1930 kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom( 1931 kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override { 1932 return out->tryPumpFrom(input, amount); 1933 } 1934 1935 Promise<void> whenWriteDisconnected() override { 1936 return out->whenWriteDisconnected(); 1937 } 1938 1939 void shutdownWrite() override { 1940 out = nullptr; 1941 } 1942 1943 private: 1944 kj::Own<kj::AsyncInputStream> in; 1945 kj::Own<kj::AsyncOutputStream> out; 1946 }; 1947 1948 KJ_TEST("WebSocket double-ping mid-send") { 1949 KJ_HTTP_TEST_SETUP_IO; 1950 1951 auto upPipe = newOneWayPipe(); 1952 auto downPipe = newOneWayPipe(); 1953 InputOutputPair client(kj::mv(downPipe.in), kj::mv(upPipe.out)); 1954 auto server = newWebSocket(kj::heap<InputOutputPair>(kj::mv(upPipe.in), kj::mv(downPipe.out)), 1955 nullptr); 1956 1957 auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), ""); 1958 auto serverTask = server->send(bigString).eagerlyEvaluate(nullptr); 1959 1960 byte DATA[] = { 1961 0x89, 0x03, 'f', 'o', 'o', // ping 1962 0x89, 0x03, 'q', 'u', 'x', // ping2 1963 0x81, 0x03, 'b', 'a', 'r', // some other message 1964 }; 1965 1966 auto clientTask = client.write(DATA, sizeof(DATA)); 1967 1968 { 1969 auto message = server->receive().wait(waitScope); 1970 KJ_ASSERT(message.is<kj::String>()); 1971 KJ_EXPECT(message.get<kj::String>() == "bar"); 1972 } 1973 1974 byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 }; 1975 expectRead(client, EXPECTED1).wait(waitScope); 1976 expectRead(client, bigString).wait(waitScope); 1977 1978 byte EXPECTED2[] = { 0x8A, 0x03, 'q', 'u', 'x' }; 1979 expectRead(client, EXPECTED2).wait(waitScope); 1980 1981 clientTask.wait(waitScope); 1982 serverTask.wait(waitScope); 1983 } 1984 1985 KJ_TEST("WebSocket ping received during pong send") { 1986 KJ_HTTP_TEST_SETUP_IO; 1987 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 1988 1989 auto client = kj::mv(pipe.ends[0]); 1990 auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr); 1991 1992 // Send a very large ping so that sending the pong takes a while. Then send a second ping 1993 // immediately after. 1994 byte PREFIX[] = { 0x89, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 }; 1995 auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), ""); 1996 byte POSTFIX[] = { 1997 0x89, 0x03, 'f', 'o', 'o', 1998 0x81, 0x03, 'b', 'a', 'r', 1999 }; 2000 2001 kj::ArrayPtr<const byte> parts[] = {PREFIX, bigString.asBytes(), POSTFIX}; 2002 auto clientTask = client->write(parts); 2003 2004 { 2005 auto message = server->receive().wait(waitScope); 2006 KJ_ASSERT(message.is<kj::String>()); 2007 KJ_EXPECT(message.get<kj::String>() == "bar"); 2008 } 2009 2010 byte EXPECTED1[] = { 0x8A, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 }; 2011 expectRead(*client, EXPECTED1).wait(waitScope); 2012 expectRead(*client, bigString).wait(waitScope); 2013 2014 byte EXPECTED2[] = { 0x8A, 0x03, 'f', 'o', 'o' }; 2015 expectRead(*client, EXPECTED2).wait(waitScope); 2016 2017 clientTask.wait(waitScope); 2018 } 2019 2020 KJ_TEST("WebSocket pump byte counting") { 2021 KJ_HTTP_TEST_SETUP_IO; 2022 auto pipe1 = KJ_HTTP_TEST_CREATE_2PIPE; 2023 auto pipe2 = KJ_HTTP_TEST_CREATE_2PIPE; 2024 2025 FakeEntropySource maskGenerator; 2026 auto server1 = newWebSocket(kj::mv(pipe1.ends[1]), nullptr); 2027 auto client2 = newWebSocket(kj::mv(pipe2.ends[0]), maskGenerator); 2028 auto server2 = newWebSocket(kj::mv(pipe2.ends[1]), nullptr); 2029 2030 auto pumpTask = server1->pumpTo(*client2); 2031 auto receiveTask = server2->receive(); 2032 2033 // Client sends three bytes of a valid message then disconnects. 2034 const char DATA[] = {0x01, 0x06, 'h'}; 2035 pipe1.ends[0]->write(DATA, 3).wait(waitScope); 2036 pipe1.ends[0] = nullptr; 2037 2038 // The pump completes successfully, forwarding the disconnect. 2039 pumpTask.wait(waitScope); 2040 2041 // The eventual receiver gets a disconnect execption. 2042 // (Note: We don't use KJ_EXPECT_THROW here because under -fno-exceptions it forks and we lose 2043 // state.) 2044 receiveTask.then([](auto) { 2045 KJ_FAIL_EXPECT("expected exception"); 2046 }, [](kj::Exception&& e) { 2047 KJ_EXPECT(e.getType() == kj::Exception::Type::DISCONNECTED); 2048 }).wait(waitScope); 2049 2050 KJ_EXPECT(server1->receivedByteCount() == 3); 2051 #if KJ_NO_RTTI 2052 // Optimized socket pump will be disabled, so only whole messages are counted by client2/server2. 2053 KJ_EXPECT(client2->sentByteCount() == 0); 2054 KJ_EXPECT(server2->receivedByteCount() == 0); 2055 #else 2056 KJ_EXPECT(client2->sentByteCount() == 3); 2057 KJ_EXPECT(server2->receivedByteCount() == 3); 2058 #endif 2059 } 2060 2061 KJ_TEST("WebSocket pump disconnect on send") { 2062 KJ_HTTP_TEST_SETUP_IO; 2063 auto pipe1 = KJ_HTTP_TEST_CREATE_2PIPE; 2064 auto pipe2 = KJ_HTTP_TEST_CREATE_2PIPE; 2065 2066 FakeEntropySource maskGenerator; 2067 auto client1 = newWebSocket(kj::mv(pipe1.ends[0]), maskGenerator); 2068 auto server1 = newWebSocket(kj::mv(pipe1.ends[1]), nullptr); 2069 auto client2 = newWebSocket(kj::mv(pipe2.ends[0]), maskGenerator); 2070 2071 auto pumpTask = server1->pumpTo(*client2); 2072 auto sendTask = client1->send("hello"_kj); 2073 2074 // Endpoint reads three bytes and then disconnects. 2075 char buffer[3]; 2076 pipe2.ends[1]->read(buffer, 3).wait(waitScope); 2077 pipe2.ends[1] = nullptr; 2078 2079 // Pump throws disconnected. 2080 KJ_EXPECT_THROW_RECOVERABLE(DISCONNECTED, pumpTask.wait(waitScope)); 2081 2082 // client1 may or may not have been able to send its whole message depending on buffering. 2083 sendTask.then([]() {}, [](kj::Exception&& e) { 2084 KJ_EXPECT(e.getType() == kj::Exception::Type::DISCONNECTED); 2085 }).wait(waitScope); 2086 } 2087 2088 KJ_TEST("WebSocket pump disconnect on receive") { 2089 KJ_HTTP_TEST_SETUP_IO; 2090 auto pipe1 = KJ_HTTP_TEST_CREATE_2PIPE; 2091 auto pipe2 = KJ_HTTP_TEST_CREATE_2PIPE; 2092 2093 FakeEntropySource maskGenerator; 2094 auto server1 = newWebSocket(kj::mv(pipe1.ends[1]), nullptr); 2095 auto client2 = newWebSocket(kj::mv(pipe2.ends[0]), maskGenerator); 2096 auto server2 = newWebSocket(kj::mv(pipe2.ends[1]), nullptr); 2097 2098 auto pumpTask = server1->pumpTo(*client2); 2099 auto receiveTask = server2->receive(); 2100 2101 // Client sends three bytes of a valid message then disconnects. 2102 const char DATA[] = {0x01, 0x06, 'h'}; 2103 pipe1.ends[0]->write(DATA, 3).wait(waitScope); 2104 pipe1.ends[0] = nullptr; 2105 2106 // The pump completes successfully, forwarding the disconnect. 2107 pumpTask.wait(waitScope); 2108 2109 // The eventual receiver gets a disconnect execption. 2110 KJ_EXPECT_THROW(DISCONNECTED, receiveTask.wait(waitScope)); 2111 } 2112 2113 class TestWebSocketService final: public HttpService, private kj::TaskSet::ErrorHandler { 2114 public: 2115 TestWebSocketService(HttpHeaderTable& headerTable, HttpHeaderId hMyHeader) 2116 : headerTable(headerTable), hMyHeader(hMyHeader), tasks(*this) {} 2117 2118 kj::Promise<void> request( 2119 HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, 2120 kj::AsyncInputStream& requestBody, Response& response) override { 2121 KJ_ASSERT(headers.isWebSocket()); 2122 2123 HttpHeaders responseHeaders(headerTable); 2124 KJ_IF_MAYBE(h, headers.get(hMyHeader)) { 2125 responseHeaders.set(hMyHeader, kj::str("respond-", *h)); 2126 } 2127 2128 if (url == "/return-error") { 2129 response.send(404, "Not Found", responseHeaders, uint64_t(0)); 2130 return kj::READY_NOW; 2131 } else if (url == "/websocket") { 2132 auto ws = response.acceptWebSocket(responseHeaders); 2133 return doWebSocket(*ws, "start-inline").attach(kj::mv(ws)); 2134 } else { 2135 KJ_FAIL_ASSERT("unexpected path", url); 2136 } 2137 } 2138 2139 private: 2140 HttpHeaderTable& headerTable; 2141 HttpHeaderId hMyHeader; 2142 kj::TaskSet tasks; 2143 2144 void taskFailed(kj::Exception&& exception) override { 2145 KJ_LOG(ERROR, exception); 2146 } 2147 2148 static kj::Promise<void> doWebSocket(WebSocket& ws, kj::StringPtr message) { 2149 auto copy = kj::str(message); 2150 return ws.send(copy).attach(kj::mv(copy)) 2151 .then([&ws]() { 2152 return ws.receive(); 2153 }).then([&ws](WebSocket::Message&& message) { 2154 KJ_SWITCH_ONEOF(message) { 2155 KJ_CASE_ONEOF(str, kj::String) { 2156 return doWebSocket(ws, kj::str("reply:", str)); 2157 } 2158 KJ_CASE_ONEOF(data, kj::Array<byte>) { 2159 return doWebSocket(ws, kj::str("reply:", data)); 2160 } 2161 KJ_CASE_ONEOF(close, WebSocket::Close) { 2162 auto reason = kj::str("close-reply:", close.reason); 2163 return ws.close(close.code + 1, reason).attach(kj::mv(reason)); 2164 } 2165 } 2166 KJ_UNREACHABLE; 2167 }); 2168 } 2169 }; 2170 2171 const char WEBSOCKET_REQUEST_HANDSHAKE[] = 2172 " HTTP/1.1\r\n" 2173 "Connection: Upgrade\r\n" 2174 "Upgrade: websocket\r\n" 2175 "Sec-WebSocket-Key: DCI4TgwiOE4MIjhODCI4Tg==\r\n" 2176 "Sec-WebSocket-Version: 13\r\n" 2177 "My-Header: foo\r\n" 2178 "\r\n"; 2179 const char WEBSOCKET_RESPONSE_HANDSHAKE[] = 2180 "HTTP/1.1 101 Switching Protocols\r\n" 2181 "Connection: Upgrade\r\n" 2182 "Upgrade: websocket\r\n" 2183 "Sec-WebSocket-Accept: pShtIFKT0s8RYZvnWY/CrjQD8CM=\r\n" 2184 "My-Header: respond-foo\r\n" 2185 "\r\n"; 2186 const char WEBSOCKET_RESPONSE_HANDSHAKE_ERROR[] = 2187 "HTTP/1.1 404 Not Found\r\n" 2188 "Content-Length: 0\r\n" 2189 "My-Header: respond-foo\r\n" 2190 "\r\n"; 2191 const byte WEBSOCKET_FIRST_MESSAGE_INLINE[] = 2192 { 0x81, 0x0c, 's','t','a','r','t','-','i','n','l','i','n','e' }; 2193 const byte WEBSOCKET_SEND_MESSAGE[] = 2194 { 0x81, 0x83, 12, 34, 56, 78, 'b'^12, 'a'^34, 'r'^56 }; 2195 const byte WEBSOCKET_REPLY_MESSAGE[] = 2196 { 0x81, 0x09, 'r','e','p','l','y',':','b','a','r' }; 2197 const byte WEBSOCKET_SEND_CLOSE[] = 2198 { 0x88, 0x85, 12, 34, 56, 78, 0x12^12, 0x34^34, 'q'^56, 'u'^78, 'x'^12 }; 2199 const byte WEBSOCKET_REPLY_CLOSE[] = 2200 { 0x88, 0x11, 0x12, 0x35, 'c','l','o','s','e','-','r','e','p','l','y',':','q','u','x' }; 2201 2202 template <size_t s> 2203 kj::ArrayPtr<const byte> asBytes(const char (&chars)[s]) { 2204 return kj::ArrayPtr<const char>(chars, s - 1).asBytes(); 2205 } 2206 2207 void testWebSocketClient(kj::WaitScope& waitScope, HttpHeaderTable& headerTable, 2208 kj::HttpHeaderId hMyHeader, HttpClient& client) { 2209 kj::HttpHeaders headers(headerTable); 2210 headers.set(hMyHeader, "foo"); 2211 auto response = client.openWebSocket("/websocket", headers).wait(waitScope); 2212 2213 KJ_EXPECT(response.statusCode == 101); 2214 KJ_EXPECT(response.statusText == "Switching Protocols", response.statusText); 2215 KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo"); 2216 KJ_ASSERT(response.webSocketOrBody.is<kj::Own<WebSocket>>()); 2217 auto ws = kj::mv(response.webSocketOrBody.get<kj::Own<WebSocket>>()); 2218 2219 { 2220 auto message = ws->receive().wait(waitScope); 2221 KJ_ASSERT(message.is<kj::String>()); 2222 KJ_EXPECT(message.get<kj::String>() == "start-inline"); 2223 } 2224 2225 ws->send(kj::StringPtr("bar")).wait(waitScope); 2226 { 2227 auto message = ws->receive().wait(waitScope); 2228 KJ_ASSERT(message.is<kj::String>()); 2229 KJ_EXPECT(message.get<kj::String>() == "reply:bar"); 2230 } 2231 2232 ws->close(0x1234, "qux").wait(waitScope); 2233 { 2234 auto message = ws->receive().wait(waitScope); 2235 KJ_ASSERT(message.is<WebSocket::Close>()); 2236 KJ_EXPECT(message.get<WebSocket::Close>().code == 0x1235); 2237 KJ_EXPECT(message.get<WebSocket::Close>().reason == "close-reply:qux"); 2238 } 2239 } 2240 2241 inline kj::Promise<void> writeA(kj::AsyncOutputStream& out, kj::ArrayPtr<const byte> data) { 2242 return out.write(data.begin(), data.size()); 2243 } 2244 2245 KJ_TEST("HttpClient WebSocket handshake") { 2246 KJ_HTTP_TEST_SETUP_IO; 2247 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2248 2249 auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE); 2250 2251 auto serverTask = expectRead(*pipe.ends[1], request) 2252 .then([&]() { return writeA(*pipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)); }) 2253 .then([&]() { return writeA(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE); }) 2254 .then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_MESSAGE); }) 2255 .then([&]() { return writeA(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE); }) 2256 .then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_CLOSE); }) 2257 .then([&]() { return writeA(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE); }) 2258 .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); }); 2259 2260 HttpHeaderTable::Builder tableBuilder; 2261 HttpHeaderId hMyHeader = tableBuilder.add("My-Header"); 2262 auto headerTable = tableBuilder.build(); 2263 2264 FakeEntropySource entropySource; 2265 HttpClientSettings clientSettings; 2266 clientSettings.entropySource = entropySource; 2267 2268 auto client = newHttpClient(*headerTable, *pipe.ends[0], clientSettings); 2269 2270 testWebSocketClient(waitScope, *headerTable, hMyHeader, *client); 2271 2272 serverTask.wait(waitScope); 2273 } 2274 2275 KJ_TEST("HttpClient WebSocket error") { 2276 KJ_HTTP_TEST_SETUP_IO; 2277 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2278 2279 auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE); 2280 2281 auto serverTask = expectRead(*pipe.ends[1], request) 2282 .then([&]() { return writeA(*pipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE_ERROR)); }) 2283 .then([&]() { return expectRead(*pipe.ends[1], request); }) 2284 .then([&]() { return writeA(*pipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE_ERROR)); }) 2285 .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); }); 2286 2287 HttpHeaderTable::Builder tableBuilder; 2288 HttpHeaderId hMyHeader = tableBuilder.add("My-Header"); 2289 auto headerTable = tableBuilder.build(); 2290 2291 FakeEntropySource entropySource; 2292 HttpClientSettings clientSettings; 2293 clientSettings.entropySource = entropySource; 2294 2295 auto client = newHttpClient(*headerTable, *pipe.ends[0], clientSettings); 2296 2297 kj::HttpHeaders headers(*headerTable); 2298 headers.set(hMyHeader, "foo"); 2299 2300 { 2301 auto response = client->openWebSocket("/websocket", headers).wait(waitScope); 2302 2303 KJ_EXPECT(response.statusCode == 404); 2304 KJ_EXPECT(response.statusText == "Not Found", response.statusText); 2305 KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo"); 2306 KJ_ASSERT(response.webSocketOrBody.is<kj::Own<AsyncInputStream>>()); 2307 } 2308 2309 { 2310 auto response = client->openWebSocket("/websocket", headers).wait(waitScope); 2311 2312 KJ_EXPECT(response.statusCode == 404); 2313 KJ_EXPECT(response.statusText == "Not Found", response.statusText); 2314 KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo"); 2315 KJ_ASSERT(response.webSocketOrBody.is<kj::Own<AsyncInputStream>>()); 2316 } 2317 2318 serverTask.wait(waitScope); 2319 } 2320 2321 KJ_TEST("HttpServer WebSocket handshake") { 2322 KJ_HTTP_TEST_SETUP_IO; 2323 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2324 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2325 2326 HttpHeaderTable::Builder tableBuilder; 2327 HttpHeaderId hMyHeader = tableBuilder.add("My-Header"); 2328 auto headerTable = tableBuilder.build(); 2329 TestWebSocketService service(*headerTable, hMyHeader); 2330 HttpServer server(timer, *headerTable, service); 2331 2332 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2333 2334 auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE); 2335 writeA(*pipe.ends[1], request.asBytes()).wait(waitScope); 2336 expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope); 2337 2338 expectRead(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope); 2339 writeA(*pipe.ends[1], WEBSOCKET_SEND_MESSAGE).wait(waitScope); 2340 expectRead(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE).wait(waitScope); 2341 writeA(*pipe.ends[1], WEBSOCKET_SEND_CLOSE).wait(waitScope); 2342 expectRead(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE).wait(waitScope); 2343 2344 listenTask.wait(waitScope); 2345 } 2346 2347 KJ_TEST("HttpServer WebSocket handshake error") { 2348 KJ_HTTP_TEST_SETUP_IO; 2349 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2350 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2351 2352 HttpHeaderTable::Builder tableBuilder; 2353 HttpHeaderId hMyHeader = tableBuilder.add("My-Header"); 2354 auto headerTable = tableBuilder.build(); 2355 TestWebSocketService service(*headerTable, hMyHeader); 2356 HttpServer server(timer, *headerTable, service); 2357 2358 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2359 2360 auto request = kj::str("GET /return-error", WEBSOCKET_REQUEST_HANDSHAKE); 2361 writeA(*pipe.ends[1], request.asBytes()).wait(waitScope); 2362 expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(waitScope); 2363 2364 // Can send more requests! 2365 writeA(*pipe.ends[1], request.asBytes()).wait(waitScope); 2366 expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(waitScope); 2367 2368 pipe.ends[1]->shutdownWrite(); 2369 2370 listenTask.wait(waitScope); 2371 } 2372 2373 // ----------------------------------------------------------------------------- 2374 2375 KJ_TEST("HttpServer request timeout") { 2376 auto PIPELINE_TESTS = pipelineTestCases(); 2377 2378 KJ_HTTP_TEST_SETUP_IO; 2379 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2380 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2381 2382 HttpHeaderTable table; 2383 TestHttpService service(PIPELINE_TESTS, table); 2384 HttpServerSettings settings; 2385 settings.headerTimeout = 1 * kj::MILLISECONDS; 2386 HttpServer server(timer, table, service, settings); 2387 2388 // Shouldn't hang! Should time out. 2389 auto promise = server.listenHttp(kj::mv(pipe.ends[0])); 2390 KJ_EXPECT(!promise.poll(waitScope)); 2391 timer.advanceTo(timer.now() + settings.headerTimeout / 2); 2392 KJ_EXPECT(!promise.poll(waitScope)); 2393 timer.advanceTo(timer.now() + settings.headerTimeout); 2394 promise.wait(waitScope); 2395 2396 // Closes the connection without sending anything. 2397 KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == ""); 2398 } 2399 2400 KJ_TEST("HttpServer pipeline timeout") { 2401 auto PIPELINE_TESTS = pipelineTestCases(); 2402 2403 KJ_HTTP_TEST_SETUP_IO; 2404 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2405 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2406 2407 HttpHeaderTable table; 2408 TestHttpService service(PIPELINE_TESTS, table); 2409 HttpServerSettings settings; 2410 settings.pipelineTimeout = 1 * kj::MILLISECONDS; 2411 HttpServer server(timer, table, service, settings); 2412 2413 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2414 2415 // Do one request. 2416 pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) 2417 .wait(waitScope); 2418 expectRead(*pipe.ends[1], PIPELINE_TESTS[0].response.raw).wait(waitScope); 2419 2420 // Listen task should time out even though we didn't shutdown the socket. 2421 KJ_EXPECT(!listenTask.poll(waitScope)); 2422 timer.advanceTo(timer.now() + settings.pipelineTimeout / 2); 2423 KJ_EXPECT(!listenTask.poll(waitScope)); 2424 timer.advanceTo(timer.now() + settings.pipelineTimeout); 2425 listenTask.wait(waitScope); 2426 2427 // In this case, no data is sent back. 2428 KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == ""); 2429 } 2430 2431 class BrokenHttpService final: public HttpService { 2432 // HttpService that doesn't send a response. 2433 public: 2434 BrokenHttpService() = default; 2435 explicit BrokenHttpService(kj::Exception&& exception): exception(kj::mv(exception)) {} 2436 2437 kj::Promise<void> request( 2438 HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, 2439 kj::AsyncInputStream& requestBody, Response& responseSender) override { 2440 return requestBody.readAllBytes().then([this](kj::Array<byte>&&) -> kj::Promise<void> { 2441 KJ_IF_MAYBE(e, exception) { 2442 return kj::cp(*e); 2443 } else { 2444 return kj::READY_NOW; 2445 } 2446 }); 2447 } 2448 2449 private: 2450 kj::Maybe<kj::Exception> exception; 2451 }; 2452 2453 KJ_TEST("HttpServer no response") { 2454 auto PIPELINE_TESTS = pipelineTestCases(); 2455 2456 KJ_HTTP_TEST_SETUP_IO; 2457 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2458 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2459 2460 HttpHeaderTable table; 2461 BrokenHttpService service; 2462 HttpServer server(timer, table, service); 2463 2464 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2465 2466 // Do one request. 2467 pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) 2468 .wait(waitScope); 2469 auto text = pipe.ends[1]->readAllText().wait(waitScope); 2470 2471 KJ_EXPECT(text == 2472 "HTTP/1.1 500 Internal Server Error\r\n" 2473 "Connection: close\r\n" 2474 "Content-Length: 51\r\n" 2475 "Content-Type: text/plain\r\n" 2476 "\r\n" 2477 "ERROR: The HttpService did not generate a response.", text); 2478 } 2479 2480 KJ_TEST("HttpServer disconnected") { 2481 auto PIPELINE_TESTS = pipelineTestCases(); 2482 2483 KJ_HTTP_TEST_SETUP_IO; 2484 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2485 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2486 2487 HttpHeaderTable table; 2488 BrokenHttpService service(KJ_EXCEPTION(DISCONNECTED, "disconnected")); 2489 HttpServer server(timer, table, service); 2490 2491 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2492 2493 // Do one request. 2494 pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) 2495 .wait(waitScope); 2496 auto text = pipe.ends[1]->readAllText().wait(waitScope); 2497 2498 KJ_EXPECT(text == "", text); 2499 } 2500 2501 KJ_TEST("HttpServer overloaded") { 2502 auto PIPELINE_TESTS = pipelineTestCases(); 2503 2504 KJ_HTTP_TEST_SETUP_IO; 2505 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2506 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2507 2508 HttpHeaderTable table; 2509 BrokenHttpService service(KJ_EXCEPTION(OVERLOADED, "overloaded")); 2510 HttpServer server(timer, table, service); 2511 2512 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2513 2514 // Do one request. 2515 pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) 2516 .wait(waitScope); 2517 auto text = pipe.ends[1]->readAllText().wait(waitScope); 2518 2519 KJ_EXPECT(text.startsWith("HTTP/1.1 503 Service Unavailable"), text); 2520 } 2521 2522 KJ_TEST("HttpServer unimplemented") { 2523 auto PIPELINE_TESTS = pipelineTestCases(); 2524 2525 KJ_HTTP_TEST_SETUP_IO; 2526 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2527 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2528 2529 HttpHeaderTable table; 2530 BrokenHttpService service(KJ_EXCEPTION(UNIMPLEMENTED, "unimplemented")); 2531 HttpServer server(timer, table, service); 2532 2533 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2534 2535 // Do one request. 2536 pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) 2537 .wait(waitScope); 2538 auto text = pipe.ends[1]->readAllText().wait(waitScope); 2539 2540 KJ_EXPECT(text.startsWith("HTTP/1.1 501 Not Implemented"), text); 2541 } 2542 2543 KJ_TEST("HttpServer threw exception") { 2544 auto PIPELINE_TESTS = pipelineTestCases(); 2545 2546 KJ_HTTP_TEST_SETUP_IO; 2547 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2548 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2549 2550 HttpHeaderTable table; 2551 BrokenHttpService service(KJ_EXCEPTION(FAILED, "failed")); 2552 HttpServer server(timer, table, service); 2553 2554 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2555 2556 // Do one request. 2557 pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) 2558 .wait(waitScope); 2559 auto text = pipe.ends[1]->readAllText().wait(waitScope); 2560 2561 KJ_EXPECT(text.startsWith("HTTP/1.1 500 Internal Server Error"), text); 2562 } 2563 2564 KJ_TEST("HttpServer bad request") { 2565 KJ_HTTP_TEST_SETUP_IO; 2566 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2567 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2568 2569 HttpHeaderTable table; 2570 BrokenHttpService service; 2571 HttpServer server(timer, table, service); 2572 2573 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2574 2575 static constexpr auto request = "GET / HTTP/1.1\r\nbad request\r\n\r\n"_kj; 2576 auto writePromise = pipe.ends[1]->write(request.begin(), request.size()); 2577 auto response = pipe.ends[1]->readAllText().wait(waitScope); 2578 KJ_EXPECT(writePromise.poll(waitScope)); 2579 writePromise.wait(waitScope); 2580 2581 static constexpr auto expectedResponse = 2582 "HTTP/1.1 400 Bad Request\r\n" 2583 "Connection: close\r\n" 2584 "Content-Length: 53\r\n" 2585 "Content-Type: text/plain\r\n" 2586 "\r\n" 2587 "ERROR: The headers sent by your client are not valid."_kj; 2588 2589 KJ_EXPECT(expectedResponse == response, expectedResponse, response); 2590 } 2591 2592 KJ_TEST("HttpServer invalid method") { 2593 KJ_HTTP_TEST_SETUP_IO; 2594 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2595 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2596 2597 HttpHeaderTable table; 2598 BrokenHttpService service; 2599 HttpServer server(timer, table, service); 2600 2601 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2602 2603 static constexpr auto request = "bad request\r\n\r\n"_kj; 2604 auto writePromise = pipe.ends[1]->write(request.begin(), request.size()); 2605 auto response = pipe.ends[1]->readAllText().wait(waitScope); 2606 KJ_EXPECT(writePromise.poll(waitScope)); 2607 writePromise.wait(waitScope); 2608 2609 static constexpr auto expectedResponse = 2610 "HTTP/1.1 501 Not Implemented\r\n" 2611 "Connection: close\r\n" 2612 "Content-Length: 35\r\n" 2613 "Content-Type: text/plain\r\n" 2614 "\r\n" 2615 "ERROR: Unrecognized request method."_kj; 2616 2617 KJ_EXPECT(expectedResponse == response, expectedResponse, response); 2618 } 2619 2620 // Ensure that HttpServerSettings can continue to be constexpr. 2621 KJ_UNUSED static constexpr HttpServerSettings STATIC_CONSTEXPR_SETTINGS {}; 2622 2623 class TestErrorHandler: public HttpServerErrorHandler { 2624 public: 2625 kj::Promise<void> handleClientProtocolError( 2626 HttpHeaders::ProtocolError protocolError, kj::HttpService::Response& response) override { 2627 // In a real error handler, you should redact `protocolError.rawContent`. 2628 auto message = kj::str("Saw protocol error: ", protocolError.description, "; rawContent = ", 2629 encodeCEscape(protocolError.rawContent)); 2630 return sendError(400, "Bad Request", kj::mv(message), response); 2631 } 2632 2633 kj::Promise<void> handleApplicationError( 2634 kj::Exception exception, kj::Maybe<kj::HttpService::Response&> response) override { 2635 return sendError(500, "Internal Server Error", 2636 kj::str("Saw application error: ", exception.getDescription()), response); 2637 } 2638 2639 kj::Promise<void> handleNoResponse(kj::HttpService::Response& response) override { 2640 return sendError(500, "Internal Server Error", kj::str("Saw no response."), response); 2641 } 2642 2643 static TestErrorHandler instance; 2644 2645 private: 2646 kj::Promise<void> sendError(uint statusCode, kj::StringPtr statusText, String message, 2647 Maybe<HttpService::Response&> response) { 2648 KJ_IF_MAYBE(r, response) { 2649 HttpHeaderTable headerTable; 2650 HttpHeaders headers(headerTable); 2651 auto body = r->send(statusCode, statusText, headers, message.size()); 2652 return body->write(message.begin(), message.size()).attach(kj::mv(body), kj::mv(message)); 2653 } else { 2654 KJ_LOG(ERROR, "Saw an error but too late to report to client."); 2655 return kj::READY_NOW; 2656 } 2657 } 2658 }; 2659 2660 TestErrorHandler TestErrorHandler::instance {}; 2661 2662 KJ_TEST("HttpServer no response, custom error handler") { 2663 auto PIPELINE_TESTS = pipelineTestCases(); 2664 2665 KJ_HTTP_TEST_SETUP_IO; 2666 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2667 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2668 2669 HttpServerSettings settings {}; 2670 settings.errorHandler = TestErrorHandler::instance; 2671 2672 HttpHeaderTable table; 2673 BrokenHttpService service; 2674 HttpServer server(timer, table, service, settings); 2675 2676 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2677 2678 // Do one request. 2679 pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) 2680 .wait(waitScope); 2681 auto text = pipe.ends[1]->readAllText().wait(waitScope); 2682 2683 KJ_EXPECT(text == 2684 "HTTP/1.1 500 Internal Server Error\r\n" 2685 "Connection: close\r\n" 2686 "Content-Length: 16\r\n" 2687 "\r\n" 2688 "Saw no response.", text); 2689 } 2690 2691 KJ_TEST("HttpServer threw exception, custom error handler") { 2692 auto PIPELINE_TESTS = pipelineTestCases(); 2693 2694 KJ_HTTP_TEST_SETUP_IO; 2695 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2696 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2697 2698 HttpServerSettings settings {}; 2699 settings.errorHandler = TestErrorHandler::instance; 2700 2701 HttpHeaderTable table; 2702 BrokenHttpService service(KJ_EXCEPTION(FAILED, "failed")); 2703 HttpServer server(timer, table, service, settings); 2704 2705 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2706 2707 // Do one request. 2708 pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) 2709 .wait(waitScope); 2710 auto text = pipe.ends[1]->readAllText().wait(waitScope); 2711 2712 KJ_EXPECT(text == 2713 "HTTP/1.1 500 Internal Server Error\r\n" 2714 "Connection: close\r\n" 2715 "Content-Length: 29\r\n" 2716 "\r\n" 2717 "Saw application error: failed", text); 2718 } 2719 2720 KJ_TEST("HttpServer bad request, custom error handler") { 2721 KJ_HTTP_TEST_SETUP_IO; 2722 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2723 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2724 2725 HttpServerSettings settings {}; 2726 settings.errorHandler = TestErrorHandler::instance; 2727 2728 HttpHeaderTable table; 2729 BrokenHttpService service; 2730 HttpServer server(timer, table, service, settings); 2731 2732 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2733 2734 static constexpr auto request = "bad request\r\n\r\n"_kj; 2735 auto writePromise = pipe.ends[1]->write(request.begin(), request.size()); 2736 auto response = pipe.ends[1]->readAllText().wait(waitScope); 2737 KJ_EXPECT(writePromise.poll(waitScope)); 2738 writePromise.wait(waitScope); 2739 2740 static constexpr auto expectedResponse = 2741 "HTTP/1.1 400 Bad Request\r\n" 2742 "Connection: close\r\n" 2743 "Content-Length: 80\r\n" 2744 "\r\n" 2745 "Saw protocol error: Unrecognized request method.; " 2746 "rawContent = bad request\\000\\n"_kj; 2747 2748 KJ_EXPECT(expectedResponse == response, expectedResponse, response); 2749 } 2750 2751 class PartialResponseService final: public HttpService { 2752 // HttpService that sends a partial response then throws. 2753 public: 2754 kj::Promise<void> request( 2755 HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, 2756 kj::AsyncInputStream& requestBody, Response& response) override { 2757 return requestBody.readAllBytes() 2758 .then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> { 2759 HttpHeaders headers(table); 2760 auto body = response.send(200, "OK", headers, 32); 2761 auto promise = body->write("foo", 3); 2762 return promise.attach(kj::mv(body)).then([]() -> kj::Promise<void> { 2763 return KJ_EXCEPTION(FAILED, "failed"); 2764 }); 2765 }); 2766 } 2767 2768 private: 2769 kj::Maybe<kj::Exception> exception; 2770 HttpHeaderTable table; 2771 }; 2772 2773 KJ_TEST("HttpServer threw exception after starting response") { 2774 auto PIPELINE_TESTS = pipelineTestCases(); 2775 2776 KJ_HTTP_TEST_SETUP_IO; 2777 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2778 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2779 2780 HttpHeaderTable table; 2781 PartialResponseService service; 2782 HttpServer server(timer, table, service); 2783 2784 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2785 2786 KJ_EXPECT_LOG(ERROR, "HttpService threw exception after generating a partial response"); 2787 2788 // Do one request. 2789 pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) 2790 .wait(waitScope); 2791 auto text = pipe.ends[1]->readAllText().wait(waitScope); 2792 2793 KJ_EXPECT(text == 2794 "HTTP/1.1 200 OK\r\n" 2795 "Content-Length: 32\r\n" 2796 "\r\n" 2797 "foo", text); 2798 } 2799 2800 class PartialResponseNoThrowService final: public HttpService { 2801 // HttpService that sends a partial response then returns without throwing. 2802 public: 2803 kj::Promise<void> request( 2804 HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, 2805 kj::AsyncInputStream& requestBody, Response& response) override { 2806 return requestBody.readAllBytes() 2807 .then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> { 2808 HttpHeaders headers(table); 2809 auto body = response.send(200, "OK", headers, 32); 2810 auto promise = body->write("foo", 3); 2811 return promise.attach(kj::mv(body)); 2812 }); 2813 } 2814 2815 private: 2816 kj::Maybe<kj::Exception> exception; 2817 HttpHeaderTable table; 2818 }; 2819 2820 KJ_TEST("HttpServer failed to write complete response but didn't throw") { 2821 auto PIPELINE_TESTS = pipelineTestCases(); 2822 2823 KJ_HTTP_TEST_SETUP_IO; 2824 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2825 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2826 2827 HttpHeaderTable table; 2828 PartialResponseNoThrowService service; 2829 HttpServer server(timer, table, service); 2830 2831 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2832 2833 // Do one request. 2834 pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) 2835 .wait(waitScope); 2836 auto text = pipe.ends[1]->readAllText().wait(waitScope); 2837 2838 KJ_EXPECT(text == 2839 "HTTP/1.1 200 OK\r\n" 2840 "Content-Length: 32\r\n" 2841 "\r\n" 2842 "foo", text); 2843 } 2844 2845 class SimpleInputStream final: public kj::AsyncInputStream { 2846 // An InputStream that returns bytes out of a static string. 2847 2848 public: 2849 SimpleInputStream(kj::StringPtr text) 2850 : unread(text.asBytes()) {} 2851 2852 kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { 2853 size_t amount = kj::min(maxBytes, unread.size()); 2854 memcpy(buffer, unread.begin(), amount); 2855 unread = unread.slice(amount, unread.size()); 2856 return amount; 2857 } 2858 2859 private: 2860 kj::ArrayPtr<const byte> unread; 2861 }; 2862 2863 class PumpResponseService final: public HttpService { 2864 // HttpService that uses pumpTo() to write a response, without carefully specifying how much to 2865 // pump, but the stream happens to be the right size. 2866 public: 2867 kj::Promise<void> request( 2868 HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, 2869 kj::AsyncInputStream& requestBody, Response& response) override { 2870 return requestBody.readAllBytes() 2871 .then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> { 2872 HttpHeaders headers(table); 2873 kj::StringPtr text = "Hello, World!"; 2874 auto body = response.send(200, "OK", headers, text.size()); 2875 2876 auto stream = kj::heap<SimpleInputStream>(text); 2877 auto promise = stream->pumpTo(*body); 2878 return promise.attach(kj::mv(body), kj::mv(stream)) 2879 .then([text](uint64_t amount) { 2880 KJ_EXPECT(amount == text.size()); 2881 }); 2882 }); 2883 } 2884 2885 private: 2886 kj::Maybe<kj::Exception> exception; 2887 HttpHeaderTable table; 2888 }; 2889 2890 KJ_TEST("HttpFixedLengthEntityWriter correctly implements tryPumpFrom") { 2891 auto PIPELINE_TESTS = pipelineTestCases(); 2892 2893 KJ_HTTP_TEST_SETUP_IO; 2894 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2895 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2896 2897 HttpHeaderTable table; 2898 PumpResponseService service; 2899 HttpServer server(timer, table, service); 2900 2901 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2902 2903 // Do one request. 2904 pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) 2905 .wait(waitScope); 2906 pipe.ends[1]->shutdownWrite(); 2907 auto text = pipe.ends[1]->readAllText().wait(waitScope); 2908 2909 KJ_EXPECT(text == 2910 "HTTP/1.1 200 OK\r\n" 2911 "Content-Length: 13\r\n" 2912 "\r\n" 2913 "Hello, World!", text); 2914 } 2915 2916 class HangingHttpService final: public HttpService { 2917 // HttpService that hangs forever. 2918 public: 2919 kj::Promise<void> request( 2920 HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, 2921 kj::AsyncInputStream& requestBody, Response& responseSender) override { 2922 kj::Promise<void> result = kj::NEVER_DONE; 2923 ++inFlight; 2924 return result.attach(kj::defer([this]() { 2925 if (--inFlight == 0) { 2926 KJ_IF_MAYBE(f, onCancelFulfiller) { 2927 f->get()->fulfill(); 2928 } 2929 } 2930 })); 2931 } 2932 2933 kj::Promise<void> onCancel() { 2934 auto paf = kj::newPromiseAndFulfiller<void>(); 2935 onCancelFulfiller = kj::mv(paf.fulfiller); 2936 return kj::mv(paf.promise); 2937 } 2938 2939 uint inFlight = 0; 2940 2941 private: 2942 kj::Maybe<kj::Exception> exception; 2943 kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> onCancelFulfiller; 2944 }; 2945 2946 KJ_TEST("HttpServer cancels request when client disconnects") { 2947 KJ_HTTP_TEST_SETUP_IO; 2948 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2949 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 2950 2951 HttpHeaderTable table; 2952 HangingHttpService service; 2953 HttpServer server(timer, table, service); 2954 2955 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 2956 2957 KJ_EXPECT(service.inFlight == 0); 2958 2959 static constexpr auto request = "GET / HTTP/1.1\r\n\r\n"_kj; 2960 pipe.ends[1]->write(request.begin(), request.size()).wait(waitScope); 2961 2962 auto cancelPromise = service.onCancel(); 2963 KJ_EXPECT(!cancelPromise.poll(waitScope)); 2964 KJ_EXPECT(service.inFlight == 1); 2965 2966 // Disconnect client and verify server cancels. 2967 pipe.ends[1] = nullptr; 2968 KJ_ASSERT(cancelPromise.poll(waitScope)); 2969 KJ_EXPECT(service.inFlight == 0); 2970 cancelPromise.wait(waitScope); 2971 } 2972 2973 // ----------------------------------------------------------------------------- 2974 2975 KJ_TEST("newHttpService from HttpClient") { 2976 auto PIPELINE_TESTS = pipelineTestCases(); 2977 2978 KJ_HTTP_TEST_SETUP_IO; 2979 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 2980 auto frontPipe = KJ_HTTP_TEST_CREATE_2PIPE; 2981 auto backPipe = KJ_HTTP_TEST_CREATE_2PIPE; 2982 2983 kj::Promise<void> writeResponsesPromise = kj::READY_NOW; 2984 for (auto& testCase: PIPELINE_TESTS) { 2985 writeResponsesPromise = writeResponsesPromise 2986 .then([&]() { 2987 return expectRead(*backPipe.ends[1], testCase.request.raw); 2988 }).then([&]() { 2989 return backPipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size()); 2990 }); 2991 } 2992 2993 { 2994 HttpHeaderTable table; 2995 auto backClient = newHttpClient(table, *backPipe.ends[0]); 2996 auto frontService = newHttpService(*backClient); 2997 HttpServer frontServer(timer, table, *frontService); 2998 auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1])); 2999 3000 for (auto& testCase: PIPELINE_TESTS) { 3001 KJ_CONTEXT(testCase.request.raw, testCase.response.raw); 3002 3003 frontPipe.ends[0]->write(testCase.request.raw.begin(), testCase.request.raw.size()) 3004 .wait(waitScope); 3005 3006 expectRead(*frontPipe.ends[0], testCase.response.raw).wait(waitScope); 3007 } 3008 3009 frontPipe.ends[0]->shutdownWrite(); 3010 listenTask.wait(waitScope); 3011 } 3012 3013 backPipe.ends[0]->shutdownWrite(); 3014 writeResponsesPromise.wait(waitScope); 3015 } 3016 3017 KJ_TEST("newHttpService from HttpClient WebSockets") { 3018 KJ_HTTP_TEST_SETUP_IO; 3019 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 3020 auto frontPipe = KJ_HTTP_TEST_CREATE_2PIPE; 3021 auto backPipe = KJ_HTTP_TEST_CREATE_2PIPE; 3022 3023 auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE); 3024 auto writeResponsesPromise = expectRead(*backPipe.ends[1], request) 3025 .then([&]() { return writeA(*backPipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)); }) 3026 .then([&]() { return writeA(*backPipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE); }) 3027 .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_MESSAGE); }) 3028 .then([&]() { return writeA(*backPipe.ends[1], WEBSOCKET_REPLY_MESSAGE); }) 3029 .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_CLOSE); }) 3030 .then([&]() { return writeA(*backPipe.ends[1], WEBSOCKET_REPLY_CLOSE); }) 3031 .then([&]() { return expectEnd(*backPipe.ends[1]); }) 3032 .then([&]() { backPipe.ends[1]->shutdownWrite(); }) 3033 .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); }); 3034 3035 { 3036 HttpHeaderTable table; 3037 FakeEntropySource entropySource; 3038 HttpClientSettings clientSettings; 3039 clientSettings.entropySource = entropySource; 3040 auto backClientStream = kj::mv(backPipe.ends[0]); 3041 auto backClient = newHttpClient(table, *backClientStream, clientSettings); 3042 auto frontService = newHttpService(*backClient); 3043 HttpServer frontServer(timer, table, *frontService); 3044 auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1])); 3045 3046 writeA(*frontPipe.ends[0], request.asBytes()).wait(waitScope); 3047 expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope); 3048 3049 expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope); 3050 writeA(*frontPipe.ends[0], WEBSOCKET_SEND_MESSAGE).wait(waitScope); 3051 expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_MESSAGE).wait(waitScope); 3052 writeA(*frontPipe.ends[0], WEBSOCKET_SEND_CLOSE).wait(waitScope); 3053 expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_CLOSE).wait(waitScope); 3054 3055 frontPipe.ends[0]->shutdownWrite(); 3056 listenTask.wait(waitScope); 3057 } 3058 3059 writeResponsesPromise.wait(waitScope); 3060 } 3061 3062 KJ_TEST("newHttpService from HttpClient WebSockets disconnect") { 3063 KJ_HTTP_TEST_SETUP_IO; 3064 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 3065 auto frontPipe = KJ_HTTP_TEST_CREATE_2PIPE; 3066 auto backPipe = KJ_HTTP_TEST_CREATE_2PIPE; 3067 3068 auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE); 3069 auto writeResponsesPromise = expectRead(*backPipe.ends[1], request) 3070 .then([&]() { return writeA(*backPipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)); }) 3071 .then([&]() { return writeA(*backPipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE); }) 3072 .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_MESSAGE); }) 3073 .then([&]() { backPipe.ends[1]->shutdownWrite(); }) 3074 .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); }); 3075 3076 { 3077 HttpHeaderTable table; 3078 FakeEntropySource entropySource; 3079 HttpClientSettings clientSettings; 3080 clientSettings.entropySource = entropySource; 3081 auto backClient = newHttpClient(table, *backPipe.ends[0], clientSettings); 3082 auto frontService = newHttpService(*backClient); 3083 HttpServer frontServer(timer, table, *frontService); 3084 auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1])); 3085 3086 writeA(*frontPipe.ends[0], request.asBytes()).wait(waitScope); 3087 expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope); 3088 3089 expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope); 3090 writeA(*frontPipe.ends[0], WEBSOCKET_SEND_MESSAGE).wait(waitScope); 3091 3092 KJ_EXPECT(frontPipe.ends[0]->readAllText().wait(waitScope) == ""); 3093 3094 frontPipe.ends[0]->shutdownWrite(); 3095 listenTask.wait(waitScope); 3096 } 3097 3098 writeResponsesPromise.wait(waitScope); 3099 } 3100 3101 // ----------------------------------------------------------------------------- 3102 3103 KJ_TEST("newHttpClient from HttpService") { 3104 auto PIPELINE_TESTS = pipelineTestCases(); 3105 3106 KJ_HTTP_TEST_SETUP_IO; 3107 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 3108 3109 HttpHeaderTable table; 3110 TestHttpService service(PIPELINE_TESTS, table); 3111 auto client = newHttpClient(service); 3112 3113 for (auto& testCase: PIPELINE_TESTS) { 3114 testHttpClient(waitScope, table, *client, testCase); 3115 } 3116 } 3117 3118 KJ_TEST("newHttpClient from HttpService WebSockets") { 3119 KJ_HTTP_TEST_SETUP_IO; 3120 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 3121 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 3122 3123 HttpHeaderTable::Builder tableBuilder; 3124 HttpHeaderId hMyHeader = tableBuilder.add("My-Header"); 3125 auto headerTable = tableBuilder.build(); 3126 TestWebSocketService service(*headerTable, hMyHeader); 3127 auto client = newHttpClient(service); 3128 3129 testWebSocketClient(waitScope, *headerTable, hMyHeader, *client); 3130 } 3131 3132 KJ_TEST("adapted client/server propagates request exceptions like non-adapted client") { 3133 KJ_HTTP_TEST_SETUP_IO; 3134 3135 HttpHeaderTable table; 3136 HttpHeaders headers(table); 3137 3138 class FailingHttpClient final: public HttpClient { 3139 public: 3140 Request request( 3141 HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, 3142 kj::Maybe<uint64_t> expectedBodySize = nullptr) override { 3143 KJ_FAIL_ASSERT("request_fail"); 3144 } 3145 3146 kj::Promise<WebSocketResponse> openWebSocket( 3147 kj::StringPtr url, const HttpHeaders& headers) override { 3148 KJ_FAIL_ASSERT("websocket_fail"); 3149 } 3150 }; 3151 3152 auto rawClient = kj::heap<FailingHttpClient>(); 3153 3154 auto innerClient = kj::heap<FailingHttpClient>(); 3155 auto adaptedService = kj::newHttpService(*innerClient).attach(kj::mv(innerClient)); 3156 auto adaptedClient = kj::newHttpClient(*adaptedService).attach(kj::mv(adaptedService)); 3157 3158 KJ_EXPECT_THROW_MESSAGE("request_fail", rawClient->request(HttpMethod::POST, "/"_kj, headers)); 3159 KJ_EXPECT_THROW_MESSAGE("request_fail", adaptedClient->request(HttpMethod::POST, "/"_kj, headers)); 3160 3161 KJ_EXPECT_THROW_MESSAGE("websocket_fail", rawClient->openWebSocket("/"_kj, headers)); 3162 KJ_EXPECT_THROW_MESSAGE("websocket_fail", adaptedClient->openWebSocket("/"_kj, headers)); 3163 } 3164 3165 class DelayedCompletionHttpService final: public HttpService { 3166 public: 3167 DelayedCompletionHttpService(HttpHeaderTable& table, kj::Maybe<uint64_t> expectedLength) 3168 : table(table), expectedLength(expectedLength) {} 3169 3170 kj::Promise<void> request( 3171 HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, 3172 kj::AsyncInputStream& requestBody, Response& response) override { 3173 auto stream = response.send(200, "OK", HttpHeaders(table), expectedLength); 3174 auto promise = stream->write("foo", 3); 3175 return promise.attach(kj::mv(stream)).then([this]() { 3176 return kj::mv(paf.promise); 3177 }); 3178 } 3179 3180 kj::PromiseFulfiller<void>& getFulfiller() { return *paf.fulfiller; } 3181 3182 private: 3183 HttpHeaderTable& table; 3184 kj::Maybe<uint64_t> expectedLength; 3185 kj::PromiseFulfillerPair<void> paf = kj::newPromiseAndFulfiller<void>(); 3186 }; 3187 3188 void doDelayedCompletionTest(bool exception, kj::Maybe<uint64_t> expectedLength) noexcept { 3189 KJ_HTTP_TEST_SETUP_IO; 3190 3191 HttpHeaderTable table; 3192 3193 DelayedCompletionHttpService service(table, expectedLength); 3194 auto client = newHttpClient(service); 3195 3196 auto resp = client->request(HttpMethod::GET, "/", HttpHeaders(table), uint64_t(0)) 3197 .response.wait(waitScope); 3198 KJ_EXPECT(resp.statusCode == 200); 3199 3200 // Read "foo" from the response body: works 3201 char buffer[16]; 3202 KJ_ASSERT(resp.body->tryRead(buffer, 1, sizeof(buffer)).wait(waitScope) == 3); 3203 buffer[3] = '\0'; 3204 KJ_EXPECT(buffer == "foo"_kj); 3205 3206 // But reading any more hangs. 3207 auto promise = resp.body->tryRead(buffer, 1, sizeof(buffer)); 3208 3209 KJ_EXPECT(!promise.poll(waitScope)); 3210 3211 // Until we cause the service to return. 3212 if (exception) { 3213 service.getFulfiller().reject(KJ_EXCEPTION(FAILED, "service-side failure")); 3214 } else { 3215 service.getFulfiller().fulfill(); 3216 } 3217 3218 KJ_ASSERT(promise.poll(waitScope)); 3219 3220 if (exception) { 3221 KJ_EXPECT_THROW_MESSAGE("service-side failure", promise.wait(waitScope)); 3222 } else { 3223 promise.wait(waitScope); 3224 } 3225 }; 3226 3227 KJ_TEST("adapted client waits for service to complete before returning EOF on response stream") { 3228 doDelayedCompletionTest(false, uint64_t(3)); 3229 } 3230 3231 KJ_TEST("adapted client waits for service to complete before returning EOF on chunked response") { 3232 doDelayedCompletionTest(false, nullptr); 3233 } 3234 3235 KJ_TEST("adapted client propagates throw from service after complete response body sent") { 3236 doDelayedCompletionTest(true, uint64_t(3)); 3237 } 3238 3239 KJ_TEST("adapted client propagates throw from service after incomplete response body sent") { 3240 doDelayedCompletionTest(true, uint64_t(6)); 3241 } 3242 3243 KJ_TEST("adapted client propagates throw from service after chunked response body sent") { 3244 doDelayedCompletionTest(true, nullptr); 3245 } 3246 3247 class DelayedCompletionWebSocketHttpService final: public HttpService { 3248 public: 3249 DelayedCompletionWebSocketHttpService(HttpHeaderTable& table, bool closeUpstreamFirst) 3250 : table(table), closeUpstreamFirst(closeUpstreamFirst) {} 3251 3252 kj::Promise<void> request( 3253 HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, 3254 kj::AsyncInputStream& requestBody, Response& response) override { 3255 KJ_ASSERT(headers.isWebSocket()); 3256 3257 auto ws = response.acceptWebSocket(HttpHeaders(table)); 3258 kj::Promise<void> promise = kj::READY_NOW; 3259 if (closeUpstreamFirst) { 3260 // Wait for a close message from the client before starting. 3261 promise = promise.then([&ws = *ws]() { return ws.receive(); }).ignoreResult(); 3262 } 3263 promise = promise 3264 .then([&ws = *ws]() { return ws.send("foo"_kj); }) 3265 .then([&ws = *ws]() { return ws.close(1234, "closed"_kj); }); 3266 if (!closeUpstreamFirst) { 3267 // Wait for a close message from the client at the end. 3268 promise = promise.then([&ws = *ws]() { return ws.receive(); }).ignoreResult(); 3269 } 3270 return promise.attach(kj::mv(ws)).then([this]() { 3271 return kj::mv(paf.promise); 3272 }); 3273 } 3274 3275 kj::PromiseFulfiller<void>& getFulfiller() { return *paf.fulfiller; } 3276 3277 private: 3278 HttpHeaderTable& table; 3279 bool closeUpstreamFirst; 3280 kj::PromiseFulfillerPair<void> paf = kj::newPromiseAndFulfiller<void>(); 3281 }; 3282 3283 void doDelayedCompletionWebSocketTest(bool exception, bool closeUpstreamFirst) noexcept { 3284 KJ_HTTP_TEST_SETUP_IO; 3285 3286 HttpHeaderTable table; 3287 3288 DelayedCompletionWebSocketHttpService service(table, closeUpstreamFirst); 3289 auto client = newHttpClient(service); 3290 3291 auto resp = client->openWebSocket("/", HttpHeaders(table)).wait(waitScope); 3292 auto ws = kj::mv(KJ_ASSERT_NONNULL(resp.webSocketOrBody.tryGet<kj::Own<WebSocket>>())); 3293 3294 if (closeUpstreamFirst) { 3295 // Send "close" immediately. 3296 ws->close(1234, "whatever"_kj).wait(waitScope); 3297 } 3298 3299 // Read "foo" from the WebSocket: works 3300 { 3301 auto msg = ws->receive().wait(waitScope); 3302 KJ_ASSERT(msg.is<kj::String>()); 3303 KJ_ASSERT(msg.get<kj::String>() == "foo"); 3304 } 3305 3306 kj::Promise<void> promise = nullptr; 3307 if (closeUpstreamFirst) { 3308 // Receiving the close hangs. 3309 promise = ws->receive() 3310 .then([](WebSocket::Message&& msg) { KJ_EXPECT(msg.is<WebSocket::Close>()); }); 3311 } else { 3312 auto msg = ws->receive().wait(waitScope); 3313 KJ_ASSERT(msg.is<WebSocket::Close>()); 3314 3315 // Sending a close hangs. 3316 promise = ws->close(1234, "whatever"_kj); 3317 } 3318 KJ_EXPECT(!promise.poll(waitScope)); 3319 3320 // Until we cause the service to return. 3321 if (exception) { 3322 service.getFulfiller().reject(KJ_EXCEPTION(FAILED, "service-side failure")); 3323 } else { 3324 service.getFulfiller().fulfill(); 3325 } 3326 3327 KJ_ASSERT(promise.poll(waitScope)); 3328 3329 if (exception) { 3330 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("service-side failure", promise.wait(waitScope)); 3331 } else { 3332 promise.wait(waitScope); 3333 } 3334 }; 3335 3336 KJ_TEST("adapted client waits for service to complete before completing upstream close on WebSocket") { 3337 doDelayedCompletionWebSocketTest(false, false); 3338 } 3339 3340 KJ_TEST("adapted client waits for service to complete before returning downstream close on WebSocket") { 3341 doDelayedCompletionWebSocketTest(false, true); 3342 } 3343 3344 KJ_TEST("adapted client propagates throw from service after WebSocket upstream close sent") { 3345 doDelayedCompletionWebSocketTest(true, false); 3346 } 3347 3348 KJ_TEST("adapted client propagates throw from service after WebSocket downstream close sent") { 3349 doDelayedCompletionWebSocketTest(true, true); 3350 } 3351 3352 // ----------------------------------------------------------------------------- 3353 3354 class CountingIoStream final: public kj::AsyncIoStream { 3355 // An AsyncIoStream wrapper which decrements a counter when destroyed (allowing us to count how 3356 // many connections are open). 3357 3358 public: 3359 CountingIoStream(kj::Own<kj::AsyncIoStream> inner, uint& count) 3360 : inner(kj::mv(inner)), count(count) {} 3361 ~CountingIoStream() noexcept(false) { 3362 --count; 3363 } 3364 3365 kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override { 3366 return inner->read(buffer, minBytes, maxBytes); 3367 } 3368 kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { 3369 return inner->tryRead(buffer, minBytes, maxBytes); 3370 } 3371 kj::Maybe<uint64_t> tryGetLength() override { 3372 return inner->tryGetLength();; 3373 } 3374 kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override { 3375 return inner->pumpTo(output, amount); 3376 } 3377 kj::Promise<void> write(const void* buffer, size_t size) override { 3378 return inner->write(buffer, size); 3379 } 3380 kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override { 3381 return inner->write(pieces); 3382 } 3383 kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom( 3384 kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override { 3385 return inner->tryPumpFrom(input, amount); 3386 } 3387 Promise<void> whenWriteDisconnected() override { 3388 return inner->whenWriteDisconnected(); 3389 } 3390 void shutdownWrite() override { 3391 return inner->shutdownWrite(); 3392 } 3393 void abortRead() override { 3394 return inner->abortRead(); 3395 } 3396 3397 public: 3398 kj::Own<AsyncIoStream> inner; 3399 uint& count; 3400 }; 3401 3402 class CountingNetworkAddress final: public kj::NetworkAddress { 3403 public: 3404 CountingNetworkAddress(kj::NetworkAddress& inner, uint& count, uint& cumulative) 3405 : inner(inner), count(count), addrCount(ownAddrCount), cumulative(cumulative) {} 3406 CountingNetworkAddress(kj::Own<kj::NetworkAddress> inner, uint& count, uint& addrCount) 3407 : inner(*inner), ownInner(kj::mv(inner)), count(count), addrCount(addrCount), 3408 cumulative(ownCumulative) {} 3409 ~CountingNetworkAddress() noexcept(false) { 3410 --addrCount; 3411 } 3412 3413 kj::Promise<kj::Own<kj::AsyncIoStream>> connect() override { 3414 ++count; 3415 ++cumulative; 3416 return inner.connect() 3417 .then([this](kj::Own<kj::AsyncIoStream> stream) -> kj::Own<kj::AsyncIoStream> { 3418 return kj::heap<CountingIoStream>(kj::mv(stream), count); 3419 }); 3420 } 3421 3422 kj::Own<kj::ConnectionReceiver> listen() override { KJ_UNIMPLEMENTED("test"); } 3423 kj::Own<kj::NetworkAddress> clone() override { KJ_UNIMPLEMENTED("test"); } 3424 kj::String toString() override { KJ_UNIMPLEMENTED("test"); } 3425 3426 private: 3427 kj::NetworkAddress& inner; 3428 kj::Own<kj::NetworkAddress> ownInner; 3429 uint& count; 3430 uint ownAddrCount = 1; 3431 uint& addrCount; 3432 uint ownCumulative = 0; 3433 uint& cumulative; 3434 }; 3435 3436 class ConnectionCountingNetwork final: public kj::Network { 3437 public: 3438 ConnectionCountingNetwork(kj::Network& inner, uint& count, uint& addrCount) 3439 : inner(inner), count(count), addrCount(addrCount) {} 3440 3441 Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override { 3442 ++addrCount; 3443 return inner.parseAddress(addr, portHint) 3444 .then([this](Own<NetworkAddress>&& addr) -> Own<NetworkAddress> { 3445 return kj::heap<CountingNetworkAddress>(kj::mv(addr), count, addrCount); 3446 }); 3447 } 3448 Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override { 3449 KJ_UNIMPLEMENTED("test"); 3450 } 3451 Own<Network> restrictPeers( 3452 kj::ArrayPtr<const kj::StringPtr> allow, 3453 kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override { 3454 KJ_UNIMPLEMENTED("test"); 3455 } 3456 3457 private: 3458 kj::Network& inner; 3459 uint& count; 3460 uint& addrCount; 3461 }; 3462 3463 class DummyService final: public HttpService { 3464 public: 3465 DummyService(HttpHeaderTable& headerTable): headerTable(headerTable) {} 3466 3467 kj::Promise<void> request( 3468 HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, 3469 kj::AsyncInputStream& requestBody, Response& response) override { 3470 if (!headers.isWebSocket()) { 3471 if (url == "/throw") { 3472 return KJ_EXCEPTION(FAILED, "client requested failure"); 3473 } 3474 3475 auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url); 3476 auto stream = response.send(200, "OK", HttpHeaders(headerTable), body.size()); 3477 auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2); 3478 promises.add(stream->write(body.begin(), body.size())); 3479 promises.add(requestBody.readAllBytes().ignoreResult()); 3480 return kj::joinPromises(promises.finish()).attach(kj::mv(stream), kj::mv(body)); 3481 } else { 3482 auto ws = response.acceptWebSocket(HttpHeaders(headerTable)); 3483 auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url); 3484 auto sendPromise = ws->send(body); 3485 3486 auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2); 3487 promises.add(sendPromise.attach(kj::mv(body))); 3488 promises.add(ws->receive().ignoreResult()); 3489 return kj::joinPromises(promises.finish()).attach(kj::mv(ws)); 3490 } 3491 } 3492 3493 private: 3494 HttpHeaderTable& headerTable; 3495 }; 3496 3497 KJ_TEST("HttpClient connection management") { 3498 KJ_HTTP_TEST_SETUP_IO; 3499 KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR; 3500 3501 kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>()); 3502 kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>()); 3503 HttpHeaderTable headerTable; 3504 3505 DummyService service(headerTable); 3506 HttpServerSettings serverSettings; 3507 HttpServer server(serverTimer, headerTable, service, serverSettings); 3508 auto listenTask = server.listenHttp(*listener); 3509 3510 uint count = 0; 3511 uint cumulative = 0; 3512 CountingNetworkAddress countingAddr(*addr, count, cumulative); 3513 3514 FakeEntropySource entropySource; 3515 HttpClientSettings clientSettings; 3516 clientSettings.entropySource = entropySource; 3517 auto client = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings); 3518 3519 KJ_EXPECT(count == 0); 3520 KJ_EXPECT(cumulative == 0); 3521 3522 uint i = 0; 3523 auto doRequest = [&]() { 3524 uint n = i++; 3525 return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response 3526 .then([](HttpClient::Response&& response) { 3527 auto promise = response.body->readAllText(); 3528 return promise.attach(kj::mv(response.body)); 3529 }).then([n](kj::String body) { 3530 KJ_EXPECT(body == kj::str("null:/", n)); 3531 }); 3532 }; 3533 3534 // We can do several requests in a row and only have one connection. 3535 doRequest().wait(waitScope); 3536 doRequest().wait(waitScope); 3537 doRequest().wait(waitScope); 3538 KJ_EXPECT(count == 1); 3539 KJ_EXPECT(cumulative == 1); 3540 3541 // But if we do two in parallel, we'll end up with two connections. 3542 auto req1 = doRequest(); 3543 auto req2 = doRequest(); 3544 req1.wait(waitScope); 3545 req2.wait(waitScope); 3546 KJ_EXPECT(count == 2); 3547 KJ_EXPECT(cumulative == 2); 3548 3549 // We can reuse after a POST, provided we write the whole POST body properly. 3550 { 3551 auto req = client->request( 3552 HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6)); 3553 req.body->write("foobar", 6).wait(waitScope); 3554 req.response.wait(waitScope).body->readAllBytes().wait(waitScope); 3555 } 3556 KJ_EXPECT(count == 2); 3557 KJ_EXPECT(cumulative == 2); 3558 doRequest().wait(waitScope); 3559 KJ_EXPECT(count == 2); 3560 KJ_EXPECT(cumulative == 2); 3561 3562 // Advance time for half the timeout, then exercise one of the connections. 3563 clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimeout / 2); 3564 doRequest().wait(waitScope); 3565 doRequest().wait(waitScope); 3566 waitScope.poll(); 3567 KJ_EXPECT(count == 2); 3568 KJ_EXPECT(cumulative == 2); 3569 3570 // Advance time past when the other connection should time out. It should be dropped. 3571 clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimeout * 3 / 4); 3572 waitScope.poll(); 3573 KJ_EXPECT(count == 1); 3574 KJ_EXPECT(cumulative == 2); 3575 3576 // Wait for the other to drop. 3577 clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimeout / 2); 3578 waitScope.poll(); 3579 KJ_EXPECT(count == 0); 3580 KJ_EXPECT(cumulative == 2); 3581 3582 // New request creates a new connection again. 3583 doRequest().wait(waitScope); 3584 KJ_EXPECT(count == 1); 3585 KJ_EXPECT(cumulative == 3); 3586 3587 // WebSocket connections are not reused. 3588 client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable)) 3589 .wait(waitScope); 3590 KJ_EXPECT(count == 0); 3591 KJ_EXPECT(cumulative == 3); 3592 3593 // Errored connections are not reused. 3594 doRequest().wait(waitScope); 3595 KJ_EXPECT(count == 1); 3596 KJ_EXPECT(cumulative == 4); 3597 client->request(HttpMethod::GET, kj::str("/throw"), HttpHeaders(headerTable)).response 3598 .wait(waitScope).body->readAllBytes().wait(waitScope); 3599 KJ_EXPECT(count == 0); 3600 KJ_EXPECT(cumulative == 4); 3601 3602 // Connections where we failed to read the full response body are not reused. 3603 doRequest().wait(waitScope); 3604 KJ_EXPECT(count == 1); 3605 KJ_EXPECT(cumulative == 5); 3606 client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable)).response 3607 .wait(waitScope); 3608 KJ_EXPECT(count == 0); 3609 KJ_EXPECT(cumulative == 5); 3610 3611 // Connections where we didn't even wait for the response headers are not reused. 3612 doRequest().wait(waitScope); 3613 KJ_EXPECT(count == 1); 3614 KJ_EXPECT(cumulative == 6); 3615 client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable)); 3616 KJ_EXPECT(count == 0); 3617 KJ_EXPECT(cumulative == 6); 3618 3619 // Connections where we failed to write the full request body are not reused. 3620 doRequest().wait(waitScope); 3621 KJ_EXPECT(count == 1); 3622 KJ_EXPECT(cumulative == 7); 3623 client->request(HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6)).response 3624 .wait(waitScope).body->readAllBytes().wait(waitScope); 3625 KJ_EXPECT(count == 0); 3626 KJ_EXPECT(cumulative == 7); 3627 3628 // If the server times out the connection, we figure it out on the client. 3629 doRequest().wait(waitScope); 3630 3631 // TODO(someday): Figure out why the following poll is necessary for the test to pass on Windows 3632 // and Mac. Without it, it seems that the request's connection never starts, so the 3633 // subsequent advanceTo() does not actually time out the connection. 3634 waitScope.poll(); 3635 3636 KJ_EXPECT(count == 1); 3637 KJ_EXPECT(cumulative == 8); 3638 serverTimer.advanceTo(serverTimer.now() + serverSettings.pipelineTimeout * 2); 3639 waitScope.poll(); 3640 KJ_EXPECT(count == 0); 3641 KJ_EXPECT(cumulative == 8); 3642 3643 // Can still make requests. 3644 doRequest().wait(waitScope); 3645 KJ_EXPECT(count == 1); 3646 KJ_EXPECT(cumulative == 9); 3647 } 3648 3649 KJ_TEST("HttpClient disable connection reuse") { 3650 KJ_HTTP_TEST_SETUP_IO; 3651 KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR; 3652 3653 kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>()); 3654 kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>()); 3655 HttpHeaderTable headerTable; 3656 3657 DummyService service(headerTable); 3658 HttpServerSettings serverSettings; 3659 HttpServer server(serverTimer, headerTable, service, serverSettings); 3660 auto listenTask = server.listenHttp(*listener); 3661 3662 uint count = 0; 3663 uint cumulative = 0; 3664 CountingNetworkAddress countingAddr(*addr, count, cumulative); 3665 3666 FakeEntropySource entropySource; 3667 HttpClientSettings clientSettings; 3668 clientSettings.entropySource = entropySource; 3669 clientSettings.idleTimeout = 0 * kj::SECONDS; 3670 auto client = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings); 3671 3672 KJ_EXPECT(count == 0); 3673 KJ_EXPECT(cumulative == 0); 3674 3675 uint i = 0; 3676 auto doRequest = [&]() { 3677 uint n = i++; 3678 return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response 3679 .then([](HttpClient::Response&& response) { 3680 auto promise = response.body->readAllText(); 3681 return promise.attach(kj::mv(response.body)); 3682 }).then([n](kj::String body) { 3683 KJ_EXPECT(body == kj::str("null:/", n)); 3684 }); 3685 }; 3686 3687 // Each serial request gets its own connection. 3688 doRequest().wait(waitScope); 3689 doRequest().wait(waitScope); 3690 doRequest().wait(waitScope); 3691 KJ_EXPECT(count == 0); 3692 KJ_EXPECT(cumulative == 3); 3693 3694 // Each parallel request gets its own connection. 3695 auto req1 = doRequest(); 3696 auto req2 = doRequest(); 3697 req1.wait(waitScope); 3698 req2.wait(waitScope); 3699 KJ_EXPECT(count == 0); 3700 KJ_EXPECT(cumulative == 5); 3701 } 3702 3703 KJ_TEST("HttpClient concurrency limiting") { 3704 #if KJ_HTTP_TEST_USE_OS_PIPE && !__linux__ 3705 // On Windows and Mac, OS event delivery is not always immediate, and that seems to make this 3706 // test flakey. On Linux, events are always immediately delivered. For now, we compile the test 3707 // but we don't run it outside of Linux. We do run the in-memory-pipes version on all OSs since 3708 // that mode shouldn't depend on kernel behavior at all. 3709 return; 3710 #endif 3711 3712 KJ_HTTP_TEST_SETUP_IO; 3713 KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR; 3714 3715 kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>()); 3716 kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>()); 3717 HttpHeaderTable headerTable; 3718 3719 DummyService service(headerTable); 3720 HttpServerSettings serverSettings; 3721 HttpServer server(serverTimer, headerTable, service, serverSettings); 3722 auto listenTask = server.listenHttp(*listener); 3723 3724 uint count = 0; 3725 uint cumulative = 0; 3726 CountingNetworkAddress countingAddr(*addr, count, cumulative); 3727 3728 FakeEntropySource entropySource; 3729 HttpClientSettings clientSettings; 3730 clientSettings.entropySource = entropySource; 3731 clientSettings.idleTimeout = 0 * kj::SECONDS; 3732 auto innerClient = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings); 3733 3734 struct CallbackEvent { 3735 uint runningCount; 3736 uint pendingCount; 3737 3738 bool operator==(const CallbackEvent& other) const { 3739 return runningCount == other.runningCount && pendingCount == other.pendingCount; 3740 } 3741 bool operator!=(const CallbackEvent& other) const { return !(*this == other); } 3742 // TODO(someday): Can use default spaceship operator in C++20: 3743 //auto operator<=>(const CallbackEvent&) const = default; 3744 }; 3745 3746 kj::Vector<CallbackEvent> callbackEvents; 3747 auto callback = [&](uint runningCount, uint pendingCount) { 3748 callbackEvents.add(CallbackEvent{runningCount, pendingCount}); 3749 }; 3750 auto client = newConcurrencyLimitingHttpClient(*innerClient, 1, kj::mv(callback)); 3751 3752 KJ_EXPECT(count == 0); 3753 KJ_EXPECT(cumulative == 0); 3754 3755 uint i = 0; 3756 auto doRequest = [&]() { 3757 uint n = i++; 3758 return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response 3759 .then([](HttpClient::Response&& response) { 3760 auto promise = response.body->readAllText(); 3761 return promise.attach(kj::mv(response.body)); 3762 }).then([n](kj::String body) { 3763 KJ_EXPECT(body == kj::str("null:/", n)); 3764 }); 3765 }; 3766 3767 // Second connection blocked by first. 3768 auto req1 = doRequest(); 3769 3770 KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 0} })); 3771 callbackEvents.clear(); 3772 3773 auto req2 = doRequest(); 3774 3775 // TODO(someday): Figure out why this poll() is necessary on Windows and macOS. 3776 waitScope.poll(); 3777 3778 KJ_EXPECT(req1.poll(waitScope)); 3779 KJ_EXPECT(!req2.poll(waitScope)); 3780 KJ_EXPECT(count == 1); 3781 KJ_EXPECT(cumulative == 1); 3782 KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 1} })); 3783 callbackEvents.clear(); 3784 3785 // Releasing first connection allows second to start. 3786 req1.wait(waitScope); 3787 KJ_EXPECT(req2.poll(waitScope)); 3788 KJ_EXPECT(count == 1); 3789 KJ_EXPECT(cumulative == 2); 3790 KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 0} })); 3791 callbackEvents.clear(); 3792 3793 req2.wait(waitScope); 3794 KJ_EXPECT(count == 0); 3795 KJ_EXPECT(cumulative == 2); 3796 KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {0, 0} })); 3797 callbackEvents.clear(); 3798 3799 // Using body stream after releasing blocked response promise throws no exception 3800 auto req3 = doRequest(); 3801 { 3802 kj::Own<kj::AsyncOutputStream> req4Body; 3803 { 3804 auto req4 = client->request(HttpMethod::GET, kj::str("/", ++i), HttpHeaders(headerTable)); 3805 waitScope.poll(); 3806 req4Body = kj::mv(req4.body); 3807 } 3808 auto writePromise = req4Body->write("a", 1); 3809 KJ_EXPECT(!writePromise.poll(waitScope)); 3810 } 3811 req3.wait(waitScope); 3812 KJ_EXPECT(count == 0); 3813 KJ_EXPECT(cumulative == 3); 3814 3815 // Similar connection limiting for web sockets 3816 // TODO(someday): Figure out why the sequencing of websockets events does 3817 // not work correctly on Windows (and maybe macOS?). The solution is not as 3818 // simple as inserting poll()s as above, since doing so puts the websocket in 3819 // a state that trips a "previous HTTP message body incomplete" assertion, 3820 // while trying to write 500 network response. 3821 callbackEvents.clear(); 3822 auto ws1 = kj::heap(client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable))); 3823 KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 0} })); 3824 callbackEvents.clear(); 3825 auto ws2 = kj::heap(client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable))); 3826 KJ_EXPECT(ws1->poll(waitScope)); 3827 KJ_EXPECT(!ws2->poll(waitScope)); 3828 KJ_EXPECT(count == 1); 3829 KJ_EXPECT(cumulative == 4); 3830 KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 1} })); 3831 callbackEvents.clear(); 3832 3833 { 3834 auto response1 = ws1->wait(waitScope); 3835 KJ_EXPECT(!ws2->poll(waitScope)); 3836 KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({})); 3837 } 3838 KJ_EXPECT(ws2->poll(waitScope)); 3839 KJ_EXPECT(count == 1); 3840 KJ_EXPECT(cumulative == 5); 3841 KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 0} })); 3842 callbackEvents.clear(); 3843 { 3844 auto response2 = ws2->wait(waitScope); 3845 KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({})); 3846 } 3847 KJ_EXPECT(count == 0); 3848 KJ_EXPECT(cumulative == 5); 3849 KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {0, 0} })); 3850 } 3851 3852 #if KJ_HTTP_TEST_USE_OS_PIPE 3853 // TODO(someday): Implement mock kj::Network for userspace version of this test? 3854 KJ_TEST("HttpClient multi host") { 3855 auto io = kj::setupAsyncIo(); 3856 3857 kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>()); 3858 kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>()); 3859 HttpHeaderTable headerTable; 3860 3861 auto listener1 = io.provider->getNetwork().parseAddress("localhost", 0) 3862 .wait(io.waitScope)->listen(); 3863 auto listener2 = io.provider->getNetwork().parseAddress("localhost", 0) 3864 .wait(io.waitScope)->listen(); 3865 DummyService service(headerTable); 3866 HttpServer server(serverTimer, headerTable, service); 3867 auto listenTask1 = server.listenHttp(*listener1); 3868 auto listenTask2 = server.listenHttp(*listener2); 3869 3870 uint count = 0, addrCount = 0; 3871 uint tlsCount = 0, tlsAddrCount = 0; 3872 ConnectionCountingNetwork countingNetwork(io.provider->getNetwork(), count, addrCount); 3873 ConnectionCountingNetwork countingTlsNetwork(io.provider->getNetwork(), tlsCount, tlsAddrCount); 3874 3875 HttpClientSettings clientSettings; 3876 auto client = newHttpClient(clientTimer, headerTable, 3877 countingNetwork, countingTlsNetwork, clientSettings); 3878 3879 KJ_EXPECT(count == 0); 3880 3881 uint i = 0; 3882 auto doRequest = [&](bool tls, uint port) { 3883 uint n = i++; 3884 // We stick a double-slash in the URL to test that it doesn't get coalesced into one slash, 3885 // which was a bug in the past. 3886 return client->request(HttpMethod::GET, 3887 kj::str((tls ? "https://localhost:" : "http://localhost:"), port, "//", n), 3888 HttpHeaders(headerTable)).response 3889 .then([](HttpClient::Response&& response) { 3890 auto promise = response.body->readAllText(); 3891 return promise.attach(kj::mv(response.body)); 3892 }).then([n, port](kj::String body) { 3893 KJ_EXPECT(body == kj::str("localhost:", port, "://", n), body, port, n); 3894 }); 3895 }; 3896 3897 uint port1 = listener1->getPort(); 3898 uint port2 = listener2->getPort(); 3899 3900 // We can do several requests in a row to the same host and only have one connection. 3901 doRequest(false, port1).wait(io.waitScope); 3902 doRequest(false, port1).wait(io.waitScope); 3903 doRequest(false, port1).wait(io.waitScope); 3904 KJ_EXPECT(count == 1); 3905 KJ_EXPECT(tlsCount == 0); 3906 KJ_EXPECT(addrCount == 1); 3907 KJ_EXPECT(tlsAddrCount == 0); 3908 3909 // Request a different host, and now we have two connections. 3910 doRequest(false, port2).wait(io.waitScope); 3911 KJ_EXPECT(count == 2); 3912 KJ_EXPECT(tlsCount == 0); 3913 KJ_EXPECT(addrCount == 2); 3914 KJ_EXPECT(tlsAddrCount == 0); 3915 3916 // Try TLS. 3917 doRequest(true, port1).wait(io.waitScope); 3918 KJ_EXPECT(count == 2); 3919 KJ_EXPECT(tlsCount == 1); 3920 KJ_EXPECT(addrCount == 2); 3921 KJ_EXPECT(tlsAddrCount == 1); 3922 3923 // Try first host again, no change in connection count. 3924 doRequest(false, port1).wait(io.waitScope); 3925 KJ_EXPECT(count == 2); 3926 KJ_EXPECT(tlsCount == 1); 3927 KJ_EXPECT(addrCount == 2); 3928 KJ_EXPECT(tlsAddrCount == 1); 3929 3930 // Multiple requests in parallel forces more connections to that host. 3931 auto promise1 = doRequest(false, port1); 3932 auto promise2 = doRequest(false, port1); 3933 promise1.wait(io.waitScope); 3934 promise2.wait(io.waitScope); 3935 KJ_EXPECT(count == 3); 3936 KJ_EXPECT(tlsCount == 1); 3937 KJ_EXPECT(addrCount == 2); 3938 KJ_EXPECT(tlsAddrCount == 1); 3939 3940 // Let everything expire. 3941 clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimeout * 2); 3942 io.waitScope.poll(); 3943 KJ_EXPECT(count == 0); 3944 KJ_EXPECT(tlsCount == 0); 3945 KJ_EXPECT(addrCount == 0); 3946 KJ_EXPECT(tlsAddrCount == 0); 3947 3948 // We can still request those hosts again. 3949 doRequest(false, port1).wait(io.waitScope); 3950 KJ_EXPECT(count == 1); 3951 KJ_EXPECT(tlsCount == 0); 3952 KJ_EXPECT(addrCount == 1); 3953 KJ_EXPECT(tlsAddrCount == 0); 3954 } 3955 #endif 3956 3957 // ----------------------------------------------------------------------------- 3958 3959 #if KJ_HTTP_TEST_USE_OS_PIPE 3960 // This test only makes sense using the real network. 3961 KJ_TEST("HttpClient to capnproto.org") { 3962 auto io = kj::setupAsyncIo(); 3963 3964 auto maybeConn = io.provider->getNetwork().parseAddress("capnproto.org", 80) 3965 .then([](kj::Own<kj::NetworkAddress> addr) { 3966 auto promise = addr->connect(); 3967 return promise.attach(kj::mv(addr)); 3968 }).then([](kj::Own<kj::AsyncIoStream>&& connection) -> kj::Maybe<kj::Own<kj::AsyncIoStream>> { 3969 return kj::mv(connection); 3970 }, [](kj::Exception&& e) -> kj::Maybe<kj::Own<kj::AsyncIoStream>> { 3971 KJ_LOG(WARNING, "skipping test because couldn't connect to capnproto.org"); 3972 return nullptr; 3973 }).wait(io.waitScope); 3974 3975 KJ_IF_MAYBE(conn, maybeConn) { 3976 // Successfully connected to capnproto.org. Try doing GET /. We expect to get a redirect to 3977 // HTTPS, because what kind of horrible web site would serve in plaintext, really? 3978 3979 HttpHeaderTable table; 3980 auto client = newHttpClient(table, **conn); 3981 3982 HttpHeaders headers(table); 3983 headers.set(HttpHeaderId::HOST, "capnproto.org"); 3984 3985 auto response = client->request(HttpMethod::GET, "/", headers).response.wait(io.waitScope); 3986 KJ_EXPECT(response.statusCode / 100 == 3); 3987 auto location = KJ_ASSERT_NONNULL(response.headers->get(HttpHeaderId::LOCATION)); 3988 KJ_EXPECT(location == "https://capnproto.org/"); 3989 3990 auto body = response.body->readAllText().wait(io.waitScope); 3991 } 3992 } 3993 #endif 3994 3995 // ======================================================================================= 3996 // Misc bugfix tests 3997 3998 class ReadCancelHttpService final: public HttpService { 3999 // HttpService that tries to read all request data but cancels after 1ms and sends a response. 4000 public: 4001 ReadCancelHttpService(kj::Timer& timer, HttpHeaderTable& headerTable) 4002 : timer(timer), headerTable(headerTable) {} 4003 4004 kj::Promise<void> request( 4005 HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, 4006 kj::AsyncInputStream& requestBody, Response& responseSender) override { 4007 if (method == HttpMethod::POST) { 4008 // Try to read all content, but cancel after 1ms. 4009 return requestBody.readAllBytes().ignoreResult() 4010 .exclusiveJoin(timer.afterDelay(1 * kj::MILLISECONDS)) 4011 .then([this, &responseSender]() { 4012 responseSender.send(408, "Request Timeout", kj::HttpHeaders(headerTable), uint64_t(0)); 4013 }); 4014 } else { 4015 responseSender.send(200, "OK", kj::HttpHeaders(headerTable), uint64_t(0)); 4016 return kj::READY_NOW; 4017 } 4018 } 4019 4020 private: 4021 kj::Timer& timer; 4022 HttpHeaderTable& headerTable; 4023 }; 4024 4025 KJ_TEST("canceling a length stream mid-read correctly discards rest of request") { 4026 KJ_HTTP_TEST_SETUP_IO; 4027 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 4028 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 4029 4030 HttpHeaderTable table; 4031 ReadCancelHttpService service(timer, table); 4032 HttpServer server(timer, table, service); 4033 4034 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 4035 4036 { 4037 static constexpr kj::StringPtr REQUEST = 4038 "POST / HTTP/1.1\r\n" 4039 "Content-Length: 6\r\n" 4040 "\r\n" 4041 "fooba"_kj; // incomplete 4042 pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope); 4043 4044 auto promise = expectRead(*pipe.ends[1], 4045 "HTTP/1.1 408 Request Timeout\r\n" 4046 "Content-Length: 0\r\n" 4047 "\r\n"_kj); 4048 4049 KJ_EXPECT(!promise.poll(waitScope)); 4050 4051 // Trigger timout, then response should be sent. 4052 timer.advanceTo(timer.now() + 1 * kj::MILLISECONDS); 4053 KJ_ASSERT(promise.poll(waitScope)); 4054 promise.wait(waitScope); 4055 } 4056 4057 // We left our request stream hanging. The server will try to read and discard the request body. 4058 // Let's give it the rest of the data, followed by a second request. 4059 { 4060 static constexpr kj::StringPtr REQUEST = 4061 "r" 4062 "GET / HTTP/1.1\r\n" 4063 "\r\n"_kj; 4064 pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope); 4065 4066 auto promise = expectRead(*pipe.ends[1], 4067 "HTTP/1.1 200 OK\r\n" 4068 "Content-Length: 0\r\n" 4069 "\r\n"_kj); 4070 KJ_ASSERT(promise.poll(waitScope)); 4071 promise.wait(waitScope); 4072 } 4073 } 4074 4075 KJ_TEST("canceling a chunked stream mid-read correctly discards rest of request") { 4076 KJ_HTTP_TEST_SETUP_IO; 4077 kj::TimerImpl timer(kj::origin<kj::TimePoint>()); 4078 auto pipe = KJ_HTTP_TEST_CREATE_2PIPE; 4079 4080 HttpHeaderTable table; 4081 ReadCancelHttpService service(timer, table); 4082 HttpServer server(timer, table, service); 4083 4084 auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); 4085 4086 { 4087 static constexpr kj::StringPtr REQUEST = 4088 "POST / HTTP/1.1\r\n" 4089 "Transfer-Encoding: chunked\r\n" 4090 "\r\n" 4091 "6\r\n" 4092 "fooba"_kj; // incomplete chunk 4093 pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope); 4094 4095 auto promise = expectRead(*pipe.ends[1], 4096 "HTTP/1.1 408 Request Timeout\r\n" 4097 "Content-Length: 0\r\n" 4098 "\r\n"_kj); 4099 4100 KJ_EXPECT(!promise.poll(waitScope)); 4101 4102 // Trigger timout, then response should be sent. 4103 timer.advanceTo(timer.now() + 1 * kj::MILLISECONDS); 4104 KJ_ASSERT(promise.poll(waitScope)); 4105 promise.wait(waitScope); 4106 } 4107 4108 // We left our request stream hanging. The server will try to read and discard the request body. 4109 // Let's give it the rest of the data, followed by a second request. 4110 { 4111 static constexpr kj::StringPtr REQUEST = 4112 "r\r\n" 4113 "4a\r\n" 4114 "this is some text that is the body of a chunk and not a valid chunk header\r\n" 4115 "0\r\n" 4116 "\r\n" 4117 "GET / HTTP/1.1\r\n" 4118 "\r\n"_kj; 4119 pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope); 4120 4121 auto promise = expectRead(*pipe.ends[1], 4122 "HTTP/1.1 200 OK\r\n" 4123 "Content-Length: 0\r\n" 4124 "\r\n"_kj); 4125 KJ_ASSERT(promise.poll(waitScope)); 4126 promise.wait(waitScope); 4127 } 4128 } 4129 4130 } // namespace 4131 } // namespace kj