async-queue.h (4166B)
1 // Copyright (c) 2021 Cloudflare, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #pragma once 23 24 #include "async.h" 25 #include "common.h" 26 #include "debug.h" 27 #include "list.h" 28 #include "memory.h" 29 30 #include <list> 31 32 KJ_BEGIN_HEADER 33 34 namespace kj { 35 36 template <typename T> 37 class WaiterQueue { 38 public: 39 // A WaiterQueue creates Nodes that blend newAdaptedPromise<T, Adaptor> and List<Node>. 40 41 WaiterQueue() = default; 42 KJ_DISALLOW_COPY(WaiterQueue); 43 44 Promise<T> wait() { 45 return newAdaptedPromise<T, Node>(queue); 46 } 47 48 void fulfill(T&& value) { 49 KJ_IREQUIRE(!empty()); 50 51 auto& node = static_cast<Node&>(queue.front()); 52 node.fulfiller.fulfill(kj::mv(value)); 53 node.remove(); 54 } 55 56 void reject(Exception&& exception) { 57 KJ_IREQUIRE(!empty()); 58 59 auto& node = static_cast<Node&>(queue.front()); 60 node.fulfiller.reject(kj::mv(exception)); 61 node.remove(); 62 } 63 64 bool empty() const { 65 return queue.empty(); 66 } 67 68 private: 69 struct BaseNode { 70 // This is a separate structure because List requires a predefined memory layout but 71 // newAdaptedPromise() only provides access to the Adaptor type in the ctor. 72 73 BaseNode(PromiseFulfiller<T>& fulfiller): fulfiller(fulfiller) {} 74 75 PromiseFulfiller<T>& fulfiller; 76 ListLink<BaseNode> link; 77 }; 78 79 using Queue = List<BaseNode, &BaseNode::link>; 80 81 struct Node: public BaseNode { 82 Node(PromiseFulfiller<T>& fulfiller, Queue& queue): BaseNode(fulfiller), queue(queue) { 83 queue.add(*this); 84 } 85 86 ~Node() noexcept(false) { 87 // When the associated Promise is destructed, so is the Node thus we should leave the queue. 88 remove(); 89 } 90 91 void remove() { 92 if(BaseNode::link.isLinked()){ 93 queue.remove(*this); 94 } 95 } 96 97 Queue& queue; 98 }; 99 100 Queue queue; 101 }; 102 103 template <typename T> 104 class ProducerConsumerQueue { 105 // ProducerConsumerQueue is an async FIFO queue. 106 107 public: 108 void push(T v) { 109 // Push an existing value onto the queue. 110 111 if (!waiters.empty()) { 112 // We have at least one waiter, give the value to the oldest. 113 KJ_IASSERT(values.empty()); 114 115 // Fulfill the first waiter and return without store our value. 116 waiters.fulfill(kj::mv(v)); 117 } else { 118 // We don't have any waiters, store the value. 119 values.push_front(kj::mv(v)); 120 } 121 } 122 123 void rejectAll(Exception e) { 124 // Reject all waiters with a given exception. 125 126 while (!waiters.empty()) { 127 auto newE = Exception(e); 128 waiters.reject(kj::mv(newE)); 129 } 130 } 131 132 Promise<T> pop() { 133 // Eventually pop a value from the queue. 134 // Note that if your sinks lag your sources, the promise will always be ready. 135 136 if (!values.empty()) { 137 // We have at least one value, get the oldest. 138 KJ_IASSERT(waiters.empty()); 139 140 auto value = kj::mv(values.back()); 141 values.pop_back(); 142 return kj::mv(value); 143 } else { 144 // We don't have any values, add ourselves to the waiting queue. 145 return waiters.wait(); 146 } 147 } 148 149 private: 150 std::list<T> values; 151 WaiterQueue<T> waiters; 152 }; 153 154 } // namespace kj 155 156 KJ_END_HEADER