capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

reconnect-test.c++ (7513B)


      1 // Copyright (c) 2020 Cloudflare, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #include "reconnect.h"
     23 #include "test-util.h"
     24 #include <kj/debug.h>
     25 #include <kj/test.h>
     26 #include <kj/async-io.h>
     27 #include "rpc-twoparty.h"
     28 
     29 namespace capnp {
     30 namespace _ {
     31 namespace {
     32 
     33 class TestInterfaceImpl final: public test::TestInterface::Server {
     34 public:
     35   TestInterfaceImpl(uint generation): generation(generation) {}
     36 
     37   void setError(kj::Exception e) {
     38     error = kj::mv(e);
     39   }
     40 
     41   kj::Own<kj::PromiseFulfiller<void>> block() {
     42     auto paf = kj::newPromiseAndFulfiller<void>();
     43     blocker = paf.promise.fork();
     44     return kj::mv(paf.fulfiller);
     45   }
     46 
     47 protected:
     48   kj::Promise<void> foo(FooContext context) override {
     49     KJ_IF_MAYBE(e, error) {
     50       return kj::cp(*e);
     51     }
     52     auto params = context.getParams();
     53     context.initResults().setX(kj::str(params.getI(), ' ', params.getJ(), ' ', generation));
     54     return blocker.addBranch();
     55   }
     56 
     57 private:
     58   uint generation;
     59   kj::Maybe<kj::Exception> error;
     60   kj::ForkedPromise<void> blocker = kj::Promise<void>(kj::READY_NOW).fork();
     61 };
     62 
     63 void doAutoReconnectTest(kj::WaitScope& ws,
     64     kj::Function<test::TestInterface::Client(test::TestInterface::Client)> wrapClient) {
     65   TestInterfaceImpl* currentServer = nullptr;
     66   uint connectCount = 0;
     67 
     68   test::TestInterface::Client client = wrapClient(autoReconnect([&]() {
     69     auto server = kj::heap<TestInterfaceImpl>(connectCount++);
     70     currentServer = server;
     71     return test::TestInterface::Client(kj::mv(server));
     72   }));
     73 
     74   auto testPromise = [&](uint i, bool j) {
     75     auto req = client.fooRequest();
     76     req.setI(i);
     77     req.setJ(j);
     78     return req.send();
     79   };
     80 
     81   auto test = [&](uint i, bool j) {
     82     return kj::str(testPromise(i, j).wait(ws).getX());
     83   };
     84 
     85   KJ_EXPECT(test(123, true) == "123 true 0");
     86 
     87   currentServer->setError(KJ_EXCEPTION(DISCONNECTED, "test1 disconnect"));
     88   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test1 disconnect", test(456, true));
     89 
     90   KJ_EXPECT(test(789, false) == "789 false 1");
     91   KJ_EXPECT(test(21, true) == "21 true 1");
     92 
     93   {
     94     // We cause two disconnect promises to be thrown concurrently. This should only cause the
     95     // reconnector to reconnect once, not twice.
     96     auto fulfiller = currentServer->block();
     97     auto promise1 = testPromise(32, false);
     98     auto promise2 = testPromise(43, true);
     99     KJ_EXPECT(!promise1.poll(ws));
    100     KJ_EXPECT(!promise2.poll(ws));
    101     fulfiller->reject(KJ_EXCEPTION(DISCONNECTED, "test2 disconnect"));
    102     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test2 disconnect", promise1.wait(ws));
    103     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test2 disconnect", promise2.wait(ws));
    104   }
    105 
    106   KJ_EXPECT(test(43, false) == "43 false 2");
    107 
    108   // Start a couple calls that will block at the server end, plus an unsent request.
    109   auto fulfiller = currentServer->block();
    110 
    111   auto promise1 = testPromise(1212, true);
    112   auto promise2 = testPromise(3434, false);
    113   auto req3 = client.fooRequest();
    114   req3.setI(5656);
    115   req3.setJ(true);
    116   KJ_EXPECT(!promise1.poll(ws));
    117   KJ_EXPECT(!promise2.poll(ws));
    118 
    119   // Now force a reconnect.
    120   currentServer->setError(KJ_EXCEPTION(DISCONNECTED, "test3 disconnect"));
    121 
    122   // Initiate a request that will fail with DISCONNECTED.
    123   auto promise4 = testPromise(7878, false);
    124 
    125   // And throw away our capability entirely, just to make sure that anyone who needs it is holding
    126   // onto their own ref.
    127   client = nullptr;
    128 
    129   // Everything we initiated should still finish.
    130   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test3 disconnect", promise4.wait(ws));
    131 
    132   // Send the request which we created before the disconnect. There are two behaviors we accept
    133   // as correct here: it may throw the disconnect exception, or it may automatically redirect to
    134   // the newly-reconnected destination.
    135   req3.send().then([](Response<test::TestInterface::FooResults> resp) {
    136     KJ_EXPECT(resp.getX() == "5656 true 3");
    137   }, [](kj::Exception e) {
    138     KJ_EXPECT(e.getDescription().endsWith("test3 disconnect"));
    139   }).wait(ws);
    140 
    141   KJ_EXPECT(!promise1.poll(ws));
    142   KJ_EXPECT(!promise2.poll(ws));
    143   fulfiller->fulfill();
    144   KJ_EXPECT(promise1.wait(ws).getX() == "1212 true 2");
    145   KJ_EXPECT(promise2.wait(ws).getX() == "3434 false 2");
    146 }
    147 
    148 KJ_TEST("autoReconnect() direct call (exercises newCall() / RequestHook)") {
    149   kj::EventLoop loop;
    150   kj::WaitScope ws(loop);
    151 
    152   doAutoReconnectTest(ws, [](auto c) {return kj::mv(c);});
    153 }
    154 
    155 KJ_TEST("autoReconnect() through RPC (exercises call() / CallContextHook)") {
    156   kj::EventLoop loop;
    157   kj::WaitScope ws(loop);
    158 
    159   auto paf = kj::newPromiseAndFulfiller<test::TestInterface::Client>();
    160 
    161   auto pipe = kj::newTwoWayPipe();
    162   TwoPartyClient client(*pipe.ends[0]);
    163   TwoPartyClient server(*pipe.ends[1], kj::mv(paf.promise), rpc::twoparty::Side::SERVER);
    164 
    165   doAutoReconnectTest(ws, [&](test::TestInterface::Client c) {
    166     paf.fulfiller->fulfill(kj::mv(c));
    167     return client.bootstrap().castAs<test::TestInterface>();
    168   });
    169 }
    170 
    171 KJ_TEST("lazyAutoReconnect() direct call (exercises newCall() / RequestHook)") {
    172   kj::EventLoop loop;
    173   kj::WaitScope ws(loop);
    174 
    175   doAutoReconnectTest(ws, [](auto c) {return kj::mv(c);});
    176 }
    177 
    178 KJ_TEST("lazyAutoReconnect() initialies lazily") {
    179   kj::EventLoop loop;
    180   kj::WaitScope ws(loop);
    181 
    182   int connectCount = 0;
    183   TestInterfaceImpl* currentServer = nullptr;
    184   auto connectCounter = [&]() {
    185     auto server = kj::heap<TestInterfaceImpl>(connectCount++);
    186     currentServer = server;
    187     return test::TestInterface::Client(kj::mv(server));
    188   };
    189 
    190   test::TestInterface::Client client = autoReconnect(connectCounter);
    191 
    192   auto test = [&](uint i, bool j) {
    193     auto req = client.fooRequest();
    194     req.setI(i);
    195     req.setJ(j);
    196     return kj::str(req.send().wait(ws).getX());
    197   };
    198 
    199   KJ_EXPECT(connectCount == 1);
    200   KJ_EXPECT(test(123, true) == "123 true 0");
    201   KJ_EXPECT(connectCount == 1);
    202 
    203   client = lazyAutoReconnect(connectCounter);
    204   KJ_EXPECT(connectCount == 1);
    205   KJ_EXPECT(test(123, true) == "123 true 1");
    206   KJ_EXPECT(connectCount == 2);
    207   KJ_EXPECT(test(234, false) == "234 false 1");
    208   KJ_EXPECT(connectCount == 2);
    209 
    210   currentServer->setError(KJ_EXCEPTION(DISCONNECTED, "test1 disconnect"));
    211   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("test1 disconnect", test(345, true));
    212 
    213   // lazyAutoReconnect is only lazy on the first request, not on reconnects.
    214   KJ_EXPECT(connectCount == 3);
    215   KJ_EXPECT(test(456, false) == "456 false 2");
    216   KJ_EXPECT(connectCount == 3);
    217 }
    218 
    219 }  // namespace
    220 }  // namespace _
    221 }  // namespace capnp