async-queue-test.c++ (4839B)
1 // Copyright (c) 2021 Cloudflare, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #include "async-queue.h" 23 24 #include <kj/async-io.h> 25 #include <kj/test.h> 26 #include <kj/vector.h> 27 28 namespace kj { 29 namespace { 30 31 struct QueueTest { 32 kj::AsyncIoContext io = setupAsyncIo(); 33 ProducerConsumerQueue<size_t> queue; 34 35 QueueTest() = default; 36 QueueTest(QueueTest&&) = delete; 37 QueueTest(const QueueTest&) = delete; 38 QueueTest& operator=(QueueTest&&) = delete; 39 QueueTest& operator=(const QueueTest&) = delete; 40 41 struct Producer { 42 QueueTest& test; 43 Promise<void> promise = kj::READY_NOW; 44 45 Producer(QueueTest& test): test(test) {} 46 47 void push(size_t i) { 48 auto push = [&, i]() -> Promise<void> { 49 test.queue.push(i); 50 return kj::READY_NOW; 51 }; 52 promise = promise.then(kj::mv(push)); 53 } 54 }; 55 56 struct Consumer { 57 QueueTest& test; 58 Promise<void> promise = kj::READY_NOW; 59 60 Consumer(QueueTest& test): test(test) {} 61 62 void pop(Vector<bool>& bits) { 63 auto pop = [&]() { 64 return test.queue.pop(); 65 }; 66 auto checkPop = [&](size_t j) -> Promise<void> { 67 bits[j] = true; 68 return kj::READY_NOW; 69 }; 70 promise = promise.then(kj::mv(pop)).then(kj::mv(checkPop)); 71 } 72 }; 73 }; 74 75 KJ_TEST("ProducerConsumerQueue with various amounts of producers and consumers") { 76 QueueTest test; 77 78 size_t constexpr kItemCount = 1000; 79 for (auto producerCount: { 1, 5, 10 }) { 80 for (auto consumerCount: { 1, 5, 10 }) { 81 KJ_LOG(INFO, "Testing a new set of Producers and Consumers", // 82 producerCount, consumerCount, kItemCount); 83 // Make a vector to track our entries. 84 auto bits = Vector<bool>(kItemCount); 85 for (auto i KJ_UNUSED : kj::zeroTo(kItemCount)) { 86 bits.add(false); 87 } 88 89 // Make enough producers. 90 auto producers = Vector<QueueTest::Producer>(); 91 for (auto i KJ_UNUSED : kj::zeroTo(producerCount)) { 92 producers.add(test); 93 } 94 95 // Make enough consumers. 96 auto consumers = Vector<QueueTest::Consumer>(); 97 for (auto i KJ_UNUSED : kj::zeroTo(consumerCount)) { 98 consumers.add(test); 99 } 100 101 for (auto i : kj::zeroTo(kItemCount)) { 102 // Use a producer and a consumer for each entry. 103 104 auto& producer = producers[i % producerCount]; 105 producer.push(i); 106 107 auto& consumer = consumers[i % consumerCount]; 108 consumer.pop(bits); 109 } 110 111 // Confirm that all entries are produced and consumed. 112 auto promises = Vector<Promise<void>>(); 113 for (auto& producer: producers) { 114 promises.add(kj::mv(producer.promise)); 115 } 116 for (auto& consumer: consumers) { 117 promises.add(kj::mv(consumer.promise)); 118 } 119 joinPromises(promises.releaseAsArray()).wait(test.io.waitScope); 120 for (auto i : kj::zeroTo(kItemCount)) { 121 KJ_ASSERT(bits[i], i); 122 } 123 } 124 } 125 } 126 127 KJ_TEST("ProducerConsumerQueue with rejectAll()") { 128 QueueTest test; 129 130 for (auto consumerCount: { 1, 5, 10 }) { 131 KJ_LOG(INFO, "Testing a new set of consumers with rejection", consumerCount); 132 133 // Make enough consumers. 134 auto promises = Vector<Promise<void>>(); 135 for (auto i KJ_UNUSED : kj::zeroTo(consumerCount)) { 136 promises.add(test.queue.pop().ignoreResult()); 137 } 138 139 for (auto& promise: promises) { 140 KJ_EXPECT(!promise.poll(test.io.waitScope), "All of our consumers should be waiting"); 141 } 142 test.queue.rejectAll(KJ_EXCEPTION(FAILED, "Total rejection")); 143 144 // We should have finished and swallowed the errors. 145 auto promise = joinPromises(promises.releaseAsArray()); 146 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("Total rejection", promise.wait(test.io.waitScope)); 147 } 148 } 149 150 } // namespace 151 } // namespace kj