capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-queue-test.c++ (4839B)


      1 // Copyright (c) 2021 Cloudflare, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #include "async-queue.h"
     23 
     24 #include <kj/async-io.h>
     25 #include <kj/test.h>
     26 #include <kj/vector.h>
     27 
     28 namespace kj {
     29 namespace {
     30 
     31 struct QueueTest {
     32   kj::AsyncIoContext io = setupAsyncIo();
     33   ProducerConsumerQueue<size_t> queue;
     34 
     35   QueueTest() = default;
     36   QueueTest(QueueTest&&) = delete;
     37   QueueTest(const QueueTest&) = delete;
     38   QueueTest& operator=(QueueTest&&) = delete;
     39   QueueTest& operator=(const QueueTest&) = delete;
     40 
     41   struct Producer {
     42     QueueTest& test;
     43     Promise<void> promise = kj::READY_NOW;
     44 
     45     Producer(QueueTest& test): test(test) {}
     46 
     47     void push(size_t i) {
     48       auto push = [&, i]() -> Promise<void> {
     49         test.queue.push(i);
     50         return kj::READY_NOW;
     51       };
     52       promise = promise.then(kj::mv(push));
     53     }
     54   };
     55 
     56   struct Consumer {
     57     QueueTest& test;
     58     Promise<void> promise = kj::READY_NOW;
     59 
     60     Consumer(QueueTest& test): test(test) {}
     61 
     62     void pop(Vector<bool>& bits) {
     63       auto pop = [&]() {
     64         return test.queue.pop();
     65       };
     66       auto checkPop = [&](size_t j) -> Promise<void> {
     67         bits[j] = true;
     68         return kj::READY_NOW;
     69       };
     70       promise = promise.then(kj::mv(pop)).then(kj::mv(checkPop));
     71     }
     72   };
     73 };
     74 
     75 KJ_TEST("ProducerConsumerQueue with various amounts of producers and consumers") {
     76   QueueTest test;
     77 
     78   size_t constexpr kItemCount = 1000;
     79   for (auto producerCount: { 1, 5, 10 }) {
     80     for (auto consumerCount: { 1, 5, 10 }) {
     81       KJ_LOG(INFO, "Testing a new set of Producers and Consumers",  //
     82              producerCount, consumerCount, kItemCount);
     83       // Make a vector to track our entries.
     84       auto bits = Vector<bool>(kItemCount);
     85       for (auto i KJ_UNUSED : kj::zeroTo(kItemCount)) {
     86         bits.add(false);
     87       }
     88 
     89       // Make enough producers.
     90       auto producers = Vector<QueueTest::Producer>();
     91       for (auto i KJ_UNUSED : kj::zeroTo(producerCount)) {
     92         producers.add(test);
     93       }
     94 
     95       // Make enough consumers.
     96       auto consumers = Vector<QueueTest::Consumer>();
     97       for (auto i KJ_UNUSED : kj::zeroTo(consumerCount)) {
     98         consumers.add(test);
     99       }
    100 
    101       for (auto i : kj::zeroTo(kItemCount)) {
    102         // Use a producer and a consumer for each entry.
    103 
    104         auto& producer = producers[i % producerCount];
    105         producer.push(i);
    106 
    107         auto& consumer = consumers[i % consumerCount];
    108         consumer.pop(bits);
    109       }
    110 
    111       // Confirm that all entries are produced and consumed.
    112       auto promises = Vector<Promise<void>>();
    113       for (auto& producer: producers) {
    114         promises.add(kj::mv(producer.promise));
    115       }
    116       for (auto& consumer: consumers) {
    117         promises.add(kj::mv(consumer.promise));
    118       }
    119       joinPromises(promises.releaseAsArray()).wait(test.io.waitScope);
    120       for (auto i : kj::zeroTo(kItemCount)) {
    121         KJ_ASSERT(bits[i], i);
    122       }
    123     }
    124   }
    125 }
    126 
    127 KJ_TEST("ProducerConsumerQueue with rejectAll()") {
    128   QueueTest test;
    129 
    130   for (auto consumerCount: { 1, 5, 10 }) {
    131     KJ_LOG(INFO, "Testing a new set of consumers with rejection", consumerCount);
    132 
    133     // Make enough consumers.
    134     auto promises = Vector<Promise<void>>();
    135     for (auto i KJ_UNUSED : kj::zeroTo(consumerCount)) {
    136       promises.add(test.queue.pop().ignoreResult());
    137     }
    138 
    139     for (auto& promise: promises) {
    140       KJ_EXPECT(!promise.poll(test.io.waitScope), "All of our consumers should be waiting");
    141     }
    142     test.queue.rejectAll(KJ_EXCEPTION(FAILED, "Total rejection"));
    143 
    144     // We should have finished and swallowed the errors.
    145     auto promise = joinPromises(promises.releaseAsArray());
    146     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("Total rejection", promise.wait(test.io.waitScope));
    147   }
    148 }
    149 
    150 }  // namespace
    151 }  // namespace kj