reconnect-test.c++ (7513B)
1 // Copyright (c) 2020 Cloudflare, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #include "reconnect.h" 23 #include "test-util.h" 24 #include <kj/debug.h> 25 #include <kj/test.h> 26 #include <kj/async-io.h> 27 #include "rpc-twoparty.h" 28 29 namespace capnp { 30 namespace _ { 31 namespace { 32 33 class TestInterfaceImpl final: public test::TestInterface::Server { 34 public: 35 TestInterfaceImpl(uint generation): generation(generation) {} 36 37 void setError(kj::Exception e) { 38 error = kj::mv(e); 39 } 40 41 kj::Own<kj::PromiseFulfiller<void>> block() { 42 auto paf = kj::newPromiseAndFulfiller<void>(); 43 blocker = paf.promise.fork(); 44 return kj::mv(paf.fulfiller); 45 } 46 47 protected: 48 kj::Promise<void> foo(FooContext context) override { 49 KJ_IF_MAYBE(e, error) { 50 return kj::cp(*e); 51 } 52 auto params = context.getParams(); 53 context.initResults().setX(kj::str(params.getI(), ' ', params.getJ(), ' ', generation)); 54 return blocker.addBranch(); 55 } 56 57 private: 58 uint generation; 59 kj::Maybe<kj::Exception> error; 60 kj::ForkedPromise<void> blocker = kj::Promise<void>(kj::READY_NOW).fork(); 61 }; 62 63 void doAutoReconnectTest(kj::WaitScope& ws, 64 kj::Function<test::TestInterface::Client(test::TestInterface::Client)> wrapClient) { 65 TestInterfaceImpl* currentServer = nullptr; 66 uint connectCount = 0; 67 68 test::TestInterface::Client client = wrapClient(autoReconnect([&]() { 69 auto server = kj::heap<TestInterfaceImpl>(connectCount++); 70 currentServer = server; 71 return test::TestInterface::Client(kj::mv(server)); 72 })); 73 74 auto testPromise = [&](uint i, bool j) { 75 auto req = client.fooRequest(); 76 req.setI(i); 77 req.setJ(j); 78 return req.send(); 79 }; 80 81 auto test = [&](uint i, bool j) { 82 return kj::str(testPromise(i, j).wait(ws).getX()); 83 }; 84 85 KJ_EXPECT(test(123, true) == "123 true 0"); 86 87 currentServer->setError(KJ_EXCEPTION(DISCONNECTED, "test1 disconnect")); 88 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test1 disconnect", test(456, true)); 89 90 KJ_EXPECT(test(789, false) == "789 false 1"); 91 KJ_EXPECT(test(21, true) == "21 true 1"); 92 93 { 94 // We cause two disconnect promises to be thrown concurrently. This should only cause the 95 // reconnector to reconnect once, not twice. 96 auto fulfiller = currentServer->block(); 97 auto promise1 = testPromise(32, false); 98 auto promise2 = testPromise(43, true); 99 KJ_EXPECT(!promise1.poll(ws)); 100 KJ_EXPECT(!promise2.poll(ws)); 101 fulfiller->reject(KJ_EXCEPTION(DISCONNECTED, "test2 disconnect")); 102 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test2 disconnect", promise1.wait(ws)); 103 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test2 disconnect", promise2.wait(ws)); 104 } 105 106 KJ_EXPECT(test(43, false) == "43 false 2"); 107 108 // Start a couple calls that will block at the server end, plus an unsent request. 109 auto fulfiller = currentServer->block(); 110 111 auto promise1 = testPromise(1212, true); 112 auto promise2 = testPromise(3434, false); 113 auto req3 = client.fooRequest(); 114 req3.setI(5656); 115 req3.setJ(true); 116 KJ_EXPECT(!promise1.poll(ws)); 117 KJ_EXPECT(!promise2.poll(ws)); 118 119 // Now force a reconnect. 120 currentServer->setError(KJ_EXCEPTION(DISCONNECTED, "test3 disconnect")); 121 122 // Initiate a request that will fail with DISCONNECTED. 123 auto promise4 = testPromise(7878, false); 124 125 // And throw away our capability entirely, just to make sure that anyone who needs it is holding 126 // onto their own ref. 127 client = nullptr; 128 129 // Everything we initiated should still finish. 130 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test3 disconnect", promise4.wait(ws)); 131 132 // Send the request which we created before the disconnect. There are two behaviors we accept 133 // as correct here: it may throw the disconnect exception, or it may automatically redirect to 134 // the newly-reconnected destination. 135 req3.send().then([](Response<test::TestInterface::FooResults> resp) { 136 KJ_EXPECT(resp.getX() == "5656 true 3"); 137 }, [](kj::Exception e) { 138 KJ_EXPECT(e.getDescription().endsWith("test3 disconnect")); 139 }).wait(ws); 140 141 KJ_EXPECT(!promise1.poll(ws)); 142 KJ_EXPECT(!promise2.poll(ws)); 143 fulfiller->fulfill(); 144 KJ_EXPECT(promise1.wait(ws).getX() == "1212 true 2"); 145 KJ_EXPECT(promise2.wait(ws).getX() == "3434 false 2"); 146 } 147 148 KJ_TEST("autoReconnect() direct call (exercises newCall() / RequestHook)") { 149 kj::EventLoop loop; 150 kj::WaitScope ws(loop); 151 152 doAutoReconnectTest(ws, [](auto c) {return kj::mv(c);}); 153 } 154 155 KJ_TEST("autoReconnect() through RPC (exercises call() / CallContextHook)") { 156 kj::EventLoop loop; 157 kj::WaitScope ws(loop); 158 159 auto paf = kj::newPromiseAndFulfiller<test::TestInterface::Client>(); 160 161 auto pipe = kj::newTwoWayPipe(); 162 TwoPartyClient client(*pipe.ends[0]); 163 TwoPartyClient server(*pipe.ends[1], kj::mv(paf.promise), rpc::twoparty::Side::SERVER); 164 165 doAutoReconnectTest(ws, [&](test::TestInterface::Client c) { 166 paf.fulfiller->fulfill(kj::mv(c)); 167 return client.bootstrap().castAs<test::TestInterface>(); 168 }); 169 } 170 171 KJ_TEST("lazyAutoReconnect() direct call (exercises newCall() / RequestHook)") { 172 kj::EventLoop loop; 173 kj::WaitScope ws(loop); 174 175 doAutoReconnectTest(ws, [](auto c) {return kj::mv(c);}); 176 } 177 178 KJ_TEST("lazyAutoReconnect() initialies lazily") { 179 kj::EventLoop loop; 180 kj::WaitScope ws(loop); 181 182 int connectCount = 0; 183 TestInterfaceImpl* currentServer = nullptr; 184 auto connectCounter = [&]() { 185 auto server = kj::heap<TestInterfaceImpl>(connectCount++); 186 currentServer = server; 187 return test::TestInterface::Client(kj::mv(server)); 188 }; 189 190 test::TestInterface::Client client = autoReconnect(connectCounter); 191 192 auto test = [&](uint i, bool j) { 193 auto req = client.fooRequest(); 194 req.setI(i); 195 req.setJ(j); 196 return kj::str(req.send().wait(ws).getX()); 197 }; 198 199 KJ_EXPECT(connectCount == 1); 200 KJ_EXPECT(test(123, true) == "123 true 0"); 201 KJ_EXPECT(connectCount == 1); 202 203 client = lazyAutoReconnect(connectCounter); 204 KJ_EXPECT(connectCount == 1); 205 KJ_EXPECT(test(123, true) == "123 true 1"); 206 KJ_EXPECT(connectCount == 2); 207 KJ_EXPECT(test(234, false) == "234 false 1"); 208 KJ_EXPECT(connectCount == 2); 209 210 currentServer->setError(KJ_EXCEPTION(DISCONNECTED, "test1 disconnect")); 211 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test1 disconnect", test(345, true)); 212 213 // lazyAutoReconnect is only lazy on the first request, not on reconnects. 214 KJ_EXPECT(connectCount == 3); 215 KJ_EXPECT(test(456, false) == "456 false 2"); 216 KJ_EXPECT(connectCount == 3); 217 } 218 219 } // namespace 220 } // namespace _ 221 } // namespace capnp