cubeb_ringbuffer.h (15215B)
1 /* 2 * Copyright © 2016 Mozilla Foundation 3 * 4 * This program is made available under an ISC-style license. See the 5 * accompanying file LICENSE for details. 6 */ 7 8 #ifndef CUBEB_RING_BUFFER_H 9 #define CUBEB_RING_BUFFER_H 10 11 #include "cubeb_utils.h" 12 #include <algorithm> 13 #include <atomic> 14 #include <cstdint> 15 #include <memory> 16 #include <thread> 17 18 /** 19 * Single producer single consumer lock-free and wait-free ring buffer. 20 * 21 * This data structure allows producing data from one thread, and consuming it 22 * on another thread, safely and without explicit synchronization. If used on 23 * two threads, this data structure uses atomics for thread safety. It is 24 * possible to disable the use of atomics at compile time and only use this data 25 * structure on one thread. 26 * 27 * The role for the producer and the consumer must be constant, i.e., the 28 * producer should always be on one thread and the consumer should always be on 29 * another thread. 30 * 31 * Some words about the inner workings of this class: 32 * - Capacity is fixed. Only one allocation is performed, in the constructor. 33 * When reading and writing, the return value of the method allows checking if 34 * the ring buffer is empty or full. 35 * - We always keep the read index at least one element ahead of the write 36 * index, so we can distinguish between an empty and a full ring buffer: an 37 * empty ring buffer is when the write index is at the same position as the 38 * read index. A full buffer is when the write index is exactly one position 39 * before the read index. 40 * - We synchronize updates to the read index after having read the data, and 41 * the write index after having written the data. This means that the each 42 * thread can only touch a portion of the buffer that is not touched by the 43 * other thread. 44 * - Callers are expected to provide buffers. When writing to the queue, 45 * elements are copied into the internal storage from the buffer passed in. 46 * When reading from the queue, the user is expected to provide a buffer. 47 * Because this is a ring buffer, data might not be contiguous in memory, 48 * providing an external buffer to copy into is an easy way to have linear 49 * data for further processing. 50 */ 51 template <typename T> class ring_buffer_base { 52 public: 53 /** 54 * Constructor for a ring buffer. 55 * 56 * This performs an allocation, but is the only allocation that will happen 57 * for the life time of a `ring_buffer_base`. 58 * 59 * @param capacity The maximum number of element this ring buffer will hold. 60 */ 61 ring_buffer_base(int capacity) 62 /* One more element to distinguish from empty and full buffer. */ 63 : capacity_(capacity + 1) 64 { 65 assert(storage_capacity() < std::numeric_limits<int>::max() / 2 && 66 "buffer too large for the type of index used."); 67 assert(capacity_ > 0); 68 69 data_.reset(new T[storage_capacity()]); 70 /* If this queue is using atomics, initializing those members as the last 71 * action in the constructor acts as a full barrier, and allow capacity() to 72 * be thread-safe. */ 73 write_index_ = 0; 74 read_index_ = 0; 75 } 76 /** 77 * Push `count` zero or default constructed elements in the array. 78 * 79 * Only safely called on the producer thread. 80 * 81 * @param count The number of elements to enqueue. 82 * @return The number of element enqueued. 83 */ 84 int enqueue_default(int count) { return enqueue(nullptr, count); } 85 /** 86 * @brief Put an element in the queue 87 * 88 * Only safely called on the producer thread. 89 * 90 * @param element The element to put in the queue. 91 * 92 * @return 1 if the element was inserted, 0 otherwise. 93 */ 94 int enqueue(T & element) { return enqueue(&element, 1); } 95 /** 96 * Push `count` elements in the ring buffer. 97 * 98 * Only safely called on the producer thread. 99 * 100 * @param elements a pointer to a buffer containing at least `count` elements. 101 * If `elements` is nullptr, zero or default constructed elements are 102 * enqueued. 103 * @param count The number of elements to read from `elements` 104 * @return The number of elements successfully coped from `elements` and 105 * inserted into the ring buffer. 106 */ 107 int enqueue(T * elements, int count) 108 { 109 #ifndef NDEBUG 110 assert_correct_thread(producer_id); 111 #endif 112 113 int wr_idx = write_index_.load(std::memory_order_relaxed); 114 int rd_idx = read_index_.load(std::memory_order_acquire); 115 116 if (full_internal(rd_idx, wr_idx)) { 117 return 0; 118 } 119 120 int to_write = std::min(available_write_internal(rd_idx, wr_idx), count); 121 122 /* First part, from the write index to the end of the array. */ 123 int first_part = std::min(storage_capacity() - wr_idx, to_write); 124 /* Second part, from the beginning of the array */ 125 int second_part = to_write - first_part; 126 127 if (elements) { 128 Copy(data_.get() + wr_idx, elements, first_part); 129 Copy(data_.get(), elements + first_part, second_part); 130 } else { 131 ConstructDefault(data_.get() + wr_idx, first_part); 132 ConstructDefault(data_.get(), second_part); 133 } 134 135 write_index_.store(increment_index(wr_idx, to_write), 136 std::memory_order_release); 137 138 return to_write; 139 } 140 /** 141 * Retrieve at most `count` elements from the ring buffer, and copy them to 142 * `elements`, if non-null. 143 * 144 * Only safely called on the consumer side. 145 * 146 * @param elements A pointer to a buffer with space for at least `count` 147 * elements. If `elements` is `nullptr`, `count` element will be discarded. 148 * @param count The maximum number of elements to dequeue. 149 * @return The number of elements written to `elements`. 150 */ 151 int dequeue(T * elements, int count) 152 { 153 #ifndef NDEBUG 154 assert_correct_thread(consumer_id); 155 #endif 156 157 int rd_idx = read_index_.load(std::memory_order_relaxed); 158 int wr_idx = write_index_.load(std::memory_order_acquire); 159 160 if (empty_internal(rd_idx, wr_idx)) { 161 return 0; 162 } 163 164 int to_read = std::min(available_read_internal(rd_idx, wr_idx), count); 165 166 int first_part = std::min(storage_capacity() - rd_idx, to_read); 167 int second_part = to_read - first_part; 168 169 if (elements) { 170 Copy(elements, data_.get() + rd_idx, first_part); 171 Copy(elements + first_part, data_.get(), second_part); 172 } 173 174 read_index_.store(increment_index(rd_idx, to_read), 175 std::memory_order_release); 176 177 return to_read; 178 } 179 /** 180 * Get the number of available element for consuming. 181 * 182 * Only safely called on the consumer thread. 183 * 184 * @return The number of available elements for reading. 185 */ 186 int available_read() const 187 { 188 #ifndef NDEBUG 189 assert_correct_thread(consumer_id); 190 #endif 191 return available_read_internal( 192 read_index_.load(std::memory_order_relaxed), 193 write_index_.load(std::memory_order_acquire)); 194 } 195 /** 196 * Get the number of available elements for consuming. 197 * 198 * Only safely called on the producer thread. 199 * 200 * @return The number of empty slots in the buffer, available for writing. 201 */ 202 int available_write() const 203 { 204 #ifndef NDEBUG 205 assert_correct_thread(producer_id); 206 #endif 207 return available_write_internal( 208 read_index_.load(std::memory_order_acquire), 209 write_index_.load(std::memory_order_relaxed)); 210 } 211 /** 212 * Get the total capacity, for this ring buffer. 213 * 214 * Can be called safely on any thread. 215 * 216 * @return The maximum capacity of this ring buffer. 217 */ 218 int capacity() const { return storage_capacity() - 1; } 219 /** 220 * Reset the consumer and producer thread identifier, in case the thread are 221 * being changed. This has to be externally synchronized. This is no-op when 222 * asserts are disabled. 223 */ 224 void reset_thread_ids() 225 { 226 #ifndef NDEBUG 227 consumer_id = producer_id = std::thread::id(); 228 #endif 229 } 230 231 private: 232 /** Return true if the ring buffer is empty. 233 * 234 * @param read_index the read index to consider 235 * @param write_index the write index to consider 236 * @return true if the ring buffer is empty, false otherwise. 237 **/ 238 bool empty_internal(int read_index, int write_index) const 239 { 240 return write_index == read_index; 241 } 242 /** Return true if the ring buffer is full. 243 * 244 * This happens if the write index is exactly one element behind the read 245 * index. 246 * 247 * @param read_index the read index to consider 248 * @param write_index the write index to consider 249 * @return true if the ring buffer is full, false otherwise. 250 **/ 251 bool full_internal(int read_index, int write_index) const 252 { 253 return (write_index + 1) % storage_capacity() == read_index; 254 } 255 /** 256 * Return the size of the storage. It is one more than the number of elements 257 * that can be stored in the buffer. 258 * 259 * @return the number of elements that can be stored in the buffer. 260 */ 261 int storage_capacity() const { return capacity_; } 262 /** 263 * Returns the number of elements available for reading. 264 * 265 * @return the number of available elements for reading. 266 */ 267 int available_read_internal(int read_index, int write_index) const 268 { 269 if (write_index >= read_index) { 270 return write_index - read_index; 271 } else { 272 return write_index + storage_capacity() - read_index; 273 } 274 } 275 /** 276 * Returns the number of empty elements, available for writing. 277 * 278 * @return the number of elements that can be written into the array. 279 */ 280 int available_write_internal(int read_index, int write_index) const 281 { 282 /* We substract one element here to always keep at least one sample 283 * free in the buffer, to distinguish between full and empty array. */ 284 int rv = read_index - write_index - 1; 285 if (write_index >= read_index) { 286 rv += storage_capacity(); 287 } 288 return rv; 289 } 290 /** 291 * Increments an index, wrapping it around the storage. 292 * 293 * @param index a reference to the index to increment. 294 * @param increment the number by which `index` is incremented. 295 * @return the new index. 296 */ 297 int increment_index(int index, int increment) const 298 { 299 assert(increment >= 0); 300 return (index + increment) % storage_capacity(); 301 } 302 /** 303 * @brief This allows checking that enqueue (resp. dequeue) are always called 304 * by the right thread. 305 * 306 * @param id the id of the thread that has called the calling method first. 307 */ 308 #ifndef NDEBUG 309 static void assert_correct_thread(std::thread::id & id) 310 { 311 if (id == std::thread::id()) { 312 id = std::this_thread::get_id(); 313 return; 314 } 315 assert(id == std::this_thread::get_id()); 316 } 317 #endif 318 /** Index at which the oldest element is at, in samples. */ 319 std::atomic<int> read_index_; 320 /** Index at which to write new elements. `write_index` is always at 321 * least one element ahead of `read_index_`. */ 322 std::atomic<int> write_index_; 323 /** Maximum number of elements that can be stored in the ring buffer. */ 324 const int capacity_; 325 /** Data storage */ 326 std::unique_ptr<T[]> data_; 327 #ifndef NDEBUG 328 /** The id of the only thread that is allowed to read from the queue. */ 329 mutable std::thread::id consumer_id; 330 /** The id of the only thread that is allowed to write from the queue. */ 331 mutable std::thread::id producer_id; 332 #endif 333 }; 334 335 /** 336 * Adapter for `ring_buffer_base` that exposes an interface in frames. 337 */ 338 template <typename T> class audio_ring_buffer_base { 339 public: 340 /** 341 * @brief Constructor. 342 * 343 * @param channel_count Number of channels. 344 * @param capacity_in_frames The capacity in frames. 345 */ 346 audio_ring_buffer_base(int channel_count, int capacity_in_frames) 347 : channel_count(channel_count), 348 ring_buffer(frames_to_samples(capacity_in_frames)) 349 { 350 assert(channel_count > 0); 351 } 352 /** 353 * @brief Enqueue silence. 354 * 355 * Only safely called on the producer thread. 356 * 357 * @param frame_count The number of frames of silence to enqueue. 358 * @return The number of frames of silence actually written to the queue. 359 */ 360 int enqueue_default(int frame_count) 361 { 362 return samples_to_frames( 363 ring_buffer.enqueue(nullptr, frames_to_samples(frame_count))); 364 } 365 /** 366 * @brief Enqueue `frames_count` frames of audio. 367 * 368 * Only safely called from the producer thread. 369 * 370 * @param [in] frames If non-null, the frames to enqueue. 371 * Otherwise, silent frames are enqueued. 372 * @param frame_count The number of frames to enqueue. 373 * 374 * @return The number of frames enqueued 375 */ 376 377 int enqueue(T * frames, int frame_count) 378 { 379 return samples_to_frames( 380 ring_buffer.enqueue(frames, frames_to_samples(frame_count))); 381 } 382 383 /** 384 * @brief Removes `frame_count` frames from the buffer, and 385 * write them to `frames` if it is non-null. 386 * 387 * Only safely called on the consumer thread. 388 * 389 * @param frames If non-null, the frames are copied to `frames`. 390 * Otherwise, they are dropped. 391 * @param frame_count The number of frames to remove. 392 * 393 * @return The number of frames actually dequeud. 394 */ 395 int dequeue(T * frames, int frame_count) 396 { 397 return samples_to_frames( 398 ring_buffer.dequeue(frames, frames_to_samples(frame_count))); 399 } 400 /** 401 * Get the number of available frames of audio for consuming. 402 * 403 * Only safely called on the consumer thread. 404 * 405 * @return The number of available frames of audio for reading. 406 */ 407 int available_read() const 408 { 409 return samples_to_frames(ring_buffer.available_read()); 410 } 411 /** 412 * Get the number of available frames of audio for consuming. 413 * 414 * Only safely called on the producer thread. 415 * 416 * @return The number of empty slots in the buffer, available for writing. 417 */ 418 int available_write() const 419 { 420 return samples_to_frames(ring_buffer.available_write()); 421 } 422 /** 423 * Get the total capacity, for this ring buffer. 424 * 425 * Can be called safely on any thread. 426 * 427 * @return The maximum capacity of this ring buffer. 428 */ 429 int capacity() const { return samples_to_frames(ring_buffer.capacity()); } 430 431 private: 432 /** 433 * @brief Frames to samples conversion. 434 * 435 * @param frames The number of frames. 436 * 437 * @return A number of samples. 438 */ 439 int frames_to_samples(int frames) const { return frames * channel_count; } 440 /** 441 * @brief Samples to frames conversion. 442 * 443 * @param samples The number of samples. 444 * 445 * @return A number of frames. 446 */ 447 int samples_to_frames(int samples) const { return samples / channel_count; } 448 /** Number of channels of audio that will stream through this ring buffer. */ 449 int channel_count; 450 /** The underlying ring buffer that is used to store the data. */ 451 ring_buffer_base<T> ring_buffer; 452 }; 453 454 /** 455 * Lock-free instantiation of the `ring_buffer_base` type. This is safe to use 456 * from two threads, one producer, one consumer (that never change role), 457 * without explicit synchronization. 458 */ 459 template <typename T> using lock_free_queue = ring_buffer_base<T>; 460 /** 461 * Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use 462 * from two threads, one producer, one consumer (that never change role), 463 * without explicit synchronization. 464 */ 465 template <typename T> 466 using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>; 467 468 #endif // CUBEB_RING_BUFFER_H