duckstation

duckstation, but archived from the revision just before upstream changed it to a proprietary software project, this version is the libre one
git clone https://git.neptards.moe/u3shit/duckstation.git
Log | Files | Refs | README | LICENSE

cubeb_ringbuffer.h (15215B)


      1 /*
      2  * Copyright © 2016 Mozilla Foundation
      3  *
      4  * This program is made available under an ISC-style license.  See the
      5  * accompanying file LICENSE for details.
      6  */
      7 
      8 #ifndef CUBEB_RING_BUFFER_H
      9 #define CUBEB_RING_BUFFER_H
     10 
     11 #include "cubeb_utils.h"
     12 #include <algorithm>
     13 #include <atomic>
     14 #include <cstdint>
     15 #include <memory>
     16 #include <thread>
     17 
     18 /**
     19  * Single producer single consumer lock-free and wait-free ring buffer.
     20  *
     21  * This data structure allows producing data from one thread, and consuming it
     22  * on another thread, safely and without explicit synchronization. If used on
     23  * two threads, this data structure uses atomics for thread safety. It is
     24  * possible to disable the use of atomics at compile time and only use this data
     25  * structure on one thread.
     26  *
     27  * The role for the producer and the consumer must be constant, i.e., the
     28  * producer should always be on one thread and the consumer should always be on
     29  * another thread.
     30  *
     31  * Some words about the inner workings of this class:
     32  * - Capacity is fixed. Only one allocation is performed, in the constructor.
     33  *   When reading and writing, the return value of the method allows checking if
     34  *   the ring buffer is empty or full.
     35  * - We always keep the read index at least one element ahead of the write
     36  *   index, so we can distinguish between an empty and a full ring buffer: an
     37  *   empty ring buffer is when the write index is at the same position as the
     38  *   read index. A full buffer is when the write index is exactly one position
     39  *   before the read index.
     40  * - We synchronize updates to the read index after having read the data, and
     41  *   the write index after having written the data. This means that the each
     42  *   thread can only touch a portion of the buffer that is not touched by the
     43  *   other thread.
     44  * - Callers are expected to provide buffers. When writing to the queue,
     45  *   elements are copied into the internal storage from the buffer passed in.
     46  *   When reading from the queue, the user is expected to provide a buffer.
     47  *   Because this is a ring buffer, data might not be contiguous in memory,
     48  *   providing an external buffer to copy into is an easy way to have linear
     49  *   data for further processing.
     50  */
     51 template <typename T> class ring_buffer_base {
     52 public:
     53   /**
     54    * Constructor for a ring buffer.
     55    *
     56    * This performs an allocation, but is the only allocation that will happen
     57    * for the life time of a `ring_buffer_base`.
     58    *
     59    * @param capacity The maximum number of element this ring buffer will hold.
     60    */
     61   ring_buffer_base(int capacity)
     62       /* One more element to distinguish from empty and full buffer. */
     63       : capacity_(capacity + 1)
     64   {
     65     assert(storage_capacity() < std::numeric_limits<int>::max() / 2 &&
     66            "buffer too large for the type of index used.");
     67     assert(capacity_ > 0);
     68 
     69     data_.reset(new T[storage_capacity()]);
     70     /* If this queue is using atomics, initializing those members as the last
     71      * action in the constructor acts as a full barrier, and allow capacity() to
     72      * be thread-safe. */
     73     write_index_ = 0;
     74     read_index_ = 0;
     75   }
     76   /**
     77    * Push `count` zero or default constructed elements in the array.
     78    *
     79    * Only safely called on the producer thread.
     80    *
     81    * @param count The number of elements to enqueue.
     82    * @return The number of element enqueued.
     83    */
     84   int enqueue_default(int count) { return enqueue(nullptr, count); }
     85   /**
     86    * @brief Put an element in the queue
     87    *
     88    * Only safely called on the producer thread.
     89    *
     90    * @param element The element to put in the queue.
     91    *
     92    * @return 1 if the element was inserted, 0 otherwise.
     93    */
     94   int enqueue(T & element) { return enqueue(&element, 1); }
     95   /**
     96    * Push `count` elements in the ring buffer.
     97    *
     98    * Only safely called on the producer thread.
     99    *
    100    * @param elements a pointer to a buffer containing at least `count` elements.
    101    * If `elements` is nullptr, zero or default constructed elements are
    102    * enqueued.
    103    * @param count The number of elements to read from `elements`
    104    * @return The number of elements successfully coped from `elements` and
    105    * inserted into the ring buffer.
    106    */
    107   int enqueue(T * elements, int count)
    108   {
    109 #ifndef NDEBUG
    110     assert_correct_thread(producer_id);
    111 #endif
    112 
    113     int wr_idx = write_index_.load(std::memory_order_relaxed);
    114     int rd_idx = read_index_.load(std::memory_order_acquire);
    115 
    116     if (full_internal(rd_idx, wr_idx)) {
    117       return 0;
    118     }
    119 
    120     int to_write = std::min(available_write_internal(rd_idx, wr_idx), count);
    121 
    122     /* First part, from the write index to the end of the array. */
    123     int first_part = std::min(storage_capacity() - wr_idx, to_write);
    124     /* Second part, from the beginning of the array */
    125     int second_part = to_write - first_part;
    126 
    127     if (elements) {
    128       Copy(data_.get() + wr_idx, elements, first_part);
    129       Copy(data_.get(), elements + first_part, second_part);
    130     } else {
    131       ConstructDefault(data_.get() + wr_idx, first_part);
    132       ConstructDefault(data_.get(), second_part);
    133     }
    134 
    135     write_index_.store(increment_index(wr_idx, to_write),
    136                        std::memory_order_release);
    137 
    138     return to_write;
    139   }
    140   /**
    141    * Retrieve at most `count` elements from the ring buffer, and copy them to
    142    * `elements`, if non-null.
    143    *
    144    * Only safely called on the consumer side.
    145    *
    146    * @param elements A pointer to a buffer with space for at least `count`
    147    * elements. If `elements` is `nullptr`, `count` element will be discarded.
    148    * @param count The maximum number of elements to dequeue.
    149    * @return The number of elements written to `elements`.
    150    */
    151   int dequeue(T * elements, int count)
    152   {
    153 #ifndef NDEBUG
    154     assert_correct_thread(consumer_id);
    155 #endif
    156 
    157     int rd_idx = read_index_.load(std::memory_order_relaxed);
    158     int wr_idx = write_index_.load(std::memory_order_acquire);
    159 
    160     if (empty_internal(rd_idx, wr_idx)) {
    161       return 0;
    162     }
    163 
    164     int to_read = std::min(available_read_internal(rd_idx, wr_idx), count);
    165 
    166     int first_part = std::min(storage_capacity() - rd_idx, to_read);
    167     int second_part = to_read - first_part;
    168 
    169     if (elements) {
    170       Copy(elements, data_.get() + rd_idx, first_part);
    171       Copy(elements + first_part, data_.get(), second_part);
    172     }
    173 
    174     read_index_.store(increment_index(rd_idx, to_read),
    175                       std::memory_order_release);
    176 
    177     return to_read;
    178   }
    179   /**
    180    * Get the number of available element for consuming.
    181    *
    182    * Only safely called on the consumer thread.
    183    *
    184    * @return The number of available elements for reading.
    185    */
    186   int available_read() const
    187   {
    188 #ifndef NDEBUG
    189     assert_correct_thread(consumer_id);
    190 #endif
    191     return available_read_internal(
    192         read_index_.load(std::memory_order_relaxed),
    193         write_index_.load(std::memory_order_acquire));
    194   }
    195   /**
    196    * Get the number of available elements for consuming.
    197    *
    198    * Only safely called on the producer thread.
    199    *
    200    * @return The number of empty slots in the buffer, available for writing.
    201    */
    202   int available_write() const
    203   {
    204 #ifndef NDEBUG
    205     assert_correct_thread(producer_id);
    206 #endif
    207     return available_write_internal(
    208         read_index_.load(std::memory_order_acquire),
    209         write_index_.load(std::memory_order_relaxed));
    210   }
    211   /**
    212    * Get the total capacity, for this ring buffer.
    213    *
    214    * Can be called safely on any thread.
    215    *
    216    * @return The maximum capacity of this ring buffer.
    217    */
    218   int capacity() const { return storage_capacity() - 1; }
    219   /**
    220    * Reset the consumer and producer thread identifier, in case the thread are
    221    * being changed. This has to be externally synchronized. This is no-op when
    222    * asserts are disabled.
    223    */
    224   void reset_thread_ids()
    225   {
    226 #ifndef NDEBUG
    227     consumer_id = producer_id = std::thread::id();
    228 #endif
    229   }
    230 
    231 private:
    232   /** Return true if the ring buffer is empty.
    233    *
    234    * @param read_index the read index to consider
    235    * @param write_index the write index to consider
    236    * @return true if the ring buffer is empty, false otherwise.
    237    **/
    238   bool empty_internal(int read_index, int write_index) const
    239   {
    240     return write_index == read_index;
    241   }
    242   /** Return true if the ring buffer is full.
    243    *
    244    * This happens if the write index is exactly one element behind the read
    245    * index.
    246    *
    247    * @param read_index the read index to consider
    248    * @param write_index the write index to consider
    249    * @return true if the ring buffer is full, false otherwise.
    250    **/
    251   bool full_internal(int read_index, int write_index) const
    252   {
    253     return (write_index + 1) % storage_capacity() == read_index;
    254   }
    255   /**
    256    * Return the size of the storage. It is one more than the number of elements
    257    * that can be stored in the buffer.
    258    *
    259    * @return the number of elements that can be stored in the buffer.
    260    */
    261   int storage_capacity() const { return capacity_; }
    262   /**
    263    * Returns the number of elements available for reading.
    264    *
    265    * @return the number of available elements for reading.
    266    */
    267   int available_read_internal(int read_index, int write_index) const
    268   {
    269     if (write_index >= read_index) {
    270       return write_index - read_index;
    271     } else {
    272       return write_index + storage_capacity() - read_index;
    273     }
    274   }
    275   /**
    276    * Returns the number of empty elements, available for writing.
    277    *
    278    * @return the number of elements that can be written into the array.
    279    */
    280   int available_write_internal(int read_index, int write_index) const
    281   {
    282     /* We substract one element here to always keep at least one sample
    283      * free in the buffer, to distinguish between full and empty array. */
    284     int rv = read_index - write_index - 1;
    285     if (write_index >= read_index) {
    286       rv += storage_capacity();
    287     }
    288     return rv;
    289   }
    290   /**
    291    * Increments an index, wrapping it around the storage.
    292    *
    293    * @param index a reference to the index to increment.
    294    * @param increment the number by which `index` is incremented.
    295    * @return the new index.
    296    */
    297   int increment_index(int index, int increment) const
    298   {
    299     assert(increment >= 0);
    300     return (index + increment) % storage_capacity();
    301   }
    302   /**
    303    * @brief This allows checking that enqueue (resp. dequeue) are always called
    304    * by the right thread.
    305    *
    306    * @param id the id of the thread that has called the calling method first.
    307    */
    308 #ifndef NDEBUG
    309   static void assert_correct_thread(std::thread::id & id)
    310   {
    311     if (id == std::thread::id()) {
    312       id = std::this_thread::get_id();
    313       return;
    314     }
    315     assert(id == std::this_thread::get_id());
    316   }
    317 #endif
    318   /** Index at which the oldest element is at, in samples. */
    319   std::atomic<int> read_index_;
    320   /** Index at which to write new elements. `write_index` is always at
    321    * least one element ahead of `read_index_`. */
    322   std::atomic<int> write_index_;
    323   /** Maximum number of elements that can be stored in the ring buffer. */
    324   const int capacity_;
    325   /** Data storage */
    326   std::unique_ptr<T[]> data_;
    327 #ifndef NDEBUG
    328   /** The id of the only thread that is allowed to read from the queue. */
    329   mutable std::thread::id consumer_id;
    330   /** The id of the only thread that is allowed to write from the queue. */
    331   mutable std::thread::id producer_id;
    332 #endif
    333 };
    334 
    335 /**
    336  * Adapter for `ring_buffer_base` that exposes an interface in frames.
    337  */
    338 template <typename T> class audio_ring_buffer_base {
    339 public:
    340   /**
    341    * @brief Constructor.
    342    *
    343    * @param channel_count       Number of channels.
    344    * @param capacity_in_frames  The capacity in frames.
    345    */
    346   audio_ring_buffer_base(int channel_count, int capacity_in_frames)
    347       : channel_count(channel_count),
    348         ring_buffer(frames_to_samples(capacity_in_frames))
    349   {
    350     assert(channel_count > 0);
    351   }
    352   /**
    353    * @brief Enqueue silence.
    354    *
    355    * Only safely called on the producer thread.
    356    *
    357    * @param frame_count The number of frames of silence to enqueue.
    358    * @return  The number of frames of silence actually written to the queue.
    359    */
    360   int enqueue_default(int frame_count)
    361   {
    362     return samples_to_frames(
    363         ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));
    364   }
    365   /**
    366    * @brief Enqueue `frames_count` frames of audio.
    367    *
    368    * Only safely called from the producer thread.
    369    *
    370    * @param [in] frames If non-null, the frames to enqueue.
    371    *                    Otherwise, silent frames are enqueued.
    372    * @param frame_count The number of frames to enqueue.
    373    *
    374    * @return The number of frames enqueued
    375    */
    376 
    377   int enqueue(T * frames, int frame_count)
    378   {
    379     return samples_to_frames(
    380         ring_buffer.enqueue(frames, frames_to_samples(frame_count)));
    381   }
    382 
    383   /**
    384    * @brief Removes `frame_count` frames from the buffer, and
    385    *        write them to `frames` if it is non-null.
    386    *
    387    * Only safely called on the consumer thread.
    388    *
    389    * @param frames      If non-null, the frames are copied to `frames`.
    390    *                    Otherwise, they are dropped.
    391    * @param frame_count The number of frames to remove.
    392    *
    393    * @return  The number of frames actually dequeud.
    394    */
    395   int dequeue(T * frames, int frame_count)
    396   {
    397     return samples_to_frames(
    398         ring_buffer.dequeue(frames, frames_to_samples(frame_count)));
    399   }
    400   /**
    401    * Get the number of available frames of audio for consuming.
    402    *
    403    * Only safely called on the consumer thread.
    404    *
    405    * @return The number of available frames of audio for reading.
    406    */
    407   int available_read() const
    408   {
    409     return samples_to_frames(ring_buffer.available_read());
    410   }
    411   /**
    412    * Get the number of available frames of audio for consuming.
    413    *
    414    * Only safely called on the producer thread.
    415    *
    416    * @return The number of empty slots in the buffer, available for writing.
    417    */
    418   int available_write() const
    419   {
    420     return samples_to_frames(ring_buffer.available_write());
    421   }
    422   /**
    423    * Get the total capacity, for this ring buffer.
    424    *
    425    * Can be called safely on any thread.
    426    *
    427    * @return The maximum capacity of this ring buffer.
    428    */
    429   int capacity() const { return samples_to_frames(ring_buffer.capacity()); }
    430 
    431 private:
    432   /**
    433    * @brief Frames to samples conversion.
    434    *
    435    * @param frames The number of frames.
    436    *
    437    * @return  A number of samples.
    438    */
    439   int frames_to_samples(int frames) const { return frames * channel_count; }
    440   /**
    441    * @brief Samples to frames conversion.
    442    *
    443    * @param samples The number of samples.
    444    *
    445    * @return  A number of frames.
    446    */
    447   int samples_to_frames(int samples) const { return samples / channel_count; }
    448   /** Number of channels of audio that will stream through this ring buffer. */
    449   int channel_count;
    450   /** The underlying ring buffer that is used to store the data. */
    451   ring_buffer_base<T> ring_buffer;
    452 };
    453 
    454 /**
    455  * Lock-free instantiation of the `ring_buffer_base` type. This is safe to use
    456  * from two threads, one producer, one consumer (that never change role),
    457  * without explicit synchronization.
    458  */
    459 template <typename T> using lock_free_queue = ring_buffer_base<T>;
    460 /**
    461  * Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use
    462  * from two threads, one producer, one consumer (that never change role),
    463  * without explicit synchronization.
    464  */
    465 template <typename T>
    466 using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>;
    467 
    468 #endif // CUBEB_RING_BUFFER_H