capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-queue.h (4166B)


      1 // Copyright (c) 2021 Cloudflare, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #pragma once
     23 
     24 #include "async.h"
     25 #include "common.h"
     26 #include "debug.h"
     27 #include "list.h"
     28 #include "memory.h"
     29 
     30 #include <list>
     31 
     32 KJ_BEGIN_HEADER
     33 
     34 namespace kj {
     35 
     36 template <typename T>
     37 class WaiterQueue {
     38 public:
     39   // A WaiterQueue creates Nodes that blend newAdaptedPromise<T, Adaptor> and List<Node>.
     40 
     41   WaiterQueue() = default;
     42   KJ_DISALLOW_COPY(WaiterQueue);
     43 
     44   Promise<T> wait() {
     45     return newAdaptedPromise<T, Node>(queue);
     46   }
     47 
     48   void fulfill(T&& value) {
     49     KJ_IREQUIRE(!empty());
     50 
     51     auto& node = static_cast<Node&>(queue.front());
     52     node.fulfiller.fulfill(kj::mv(value));
     53     node.remove();
     54   }
     55 
     56   void reject(Exception&& exception) {
     57     KJ_IREQUIRE(!empty());
     58 
     59     auto& node = static_cast<Node&>(queue.front());
     60     node.fulfiller.reject(kj::mv(exception));
     61     node.remove();
     62   }
     63 
     64   bool empty() const {
     65     return queue.empty();
     66   }
     67 
     68 private:
     69   struct BaseNode {
     70     // This is a separate structure because List requires a predefined memory layout but
     71     // newAdaptedPromise() only provides access to the Adaptor type in the ctor.
     72 
     73     BaseNode(PromiseFulfiller<T>& fulfiller): fulfiller(fulfiller) {}
     74 
     75     PromiseFulfiller<T>& fulfiller;
     76     ListLink<BaseNode> link;
     77   };
     78 
     79   using Queue = List<BaseNode, &BaseNode::link>;
     80 
     81   struct Node: public BaseNode {
     82     Node(PromiseFulfiller<T>& fulfiller, Queue& queue): BaseNode(fulfiller), queue(queue) {
     83       queue.add(*this);
     84     }
     85 
     86     ~Node() noexcept(false) {
     87       // When the associated Promise is destructed, so is the Node thus we should leave the queue.
     88       remove();
     89     }
     90 
     91     void remove() {
     92       if(BaseNode::link.isLinked()){
     93         queue.remove(*this);
     94       }
     95     }
     96 
     97     Queue& queue;
     98   };
     99 
    100   Queue queue;
    101 };
    102 
    103 template <typename T>
    104 class ProducerConsumerQueue {
    105   // ProducerConsumerQueue is an async FIFO queue.
    106 
    107 public:
    108   void push(T v) {
    109     // Push an existing value onto the queue.
    110 
    111     if (!waiters.empty()) {
    112       // We have at least one waiter, give the value to the oldest.
    113       KJ_IASSERT(values.empty());
    114 
    115       // Fulfill the first waiter and return without store our value.
    116       waiters.fulfill(kj::mv(v));
    117     } else {
    118       // We don't have any waiters, store the value.
    119       values.push_front(kj::mv(v));
    120     }
    121   }
    122 
    123   void rejectAll(Exception e) {
    124     // Reject all waiters with a given exception.
    125 
    126     while (!waiters.empty()) {
    127       auto newE = Exception(e);
    128       waiters.reject(kj::mv(newE));
    129     }
    130   }
    131 
    132   Promise<T> pop() {
    133     // Eventually pop a value from the queue.
    134     // Note that if your sinks lag your sources, the promise will always be ready.
    135 
    136     if (!values.empty()) {
    137       // We have at least one value, get the oldest.
    138       KJ_IASSERT(waiters.empty());
    139 
    140       auto value = kj::mv(values.back());
    141       values.pop_back();
    142       return kj::mv(value);
    143     } else {
    144       // We don't have any values, add ourselves to the waiting queue.
    145       return waiters.wait();
    146     }
    147   }
    148 
    149 private:
    150   std::list<T> values;
    151   WaiterQueue<T> waiters;
    152 };
    153 
    154 }  // namespace kj
    155 
    156 KJ_END_HEADER