libjxl

FORK: libjxl patches used on blog
git clone https://git.neptards.moe/blog/libjxl.git
Log | Files | Refs | Submodules | README | LICENSE

thread_parallel_runner_internal.cc (6621B)


      1 // Copyright (c) the JPEG XL Project Authors. All rights reserved.
      2 //
      3 // Use of this source code is governed by a BSD-style
      4 // license that can be found in the LICENSE file.
      5 
      6 #include "lib/threads/thread_parallel_runner_internal.h"
      7 
      8 #include <jxl/parallel_runner.h>
      9 #include <jxl/types.h>
     10 
     11 #include <algorithm>
     12 #include <atomic>
     13 #include <cstddef>
     14 #include <cstdint>
     15 #include <mutex>
     16 #include <thread>
     17 
     18 #if defined(ADDRESS_SANITIZER) || defined(MEMORY_SANITIZER) || \
     19     defined(THREAD_SANITIZER)
     20 #include "sanitizer/common_interface_defs.h"  // __sanitizer_print_stack_trace
     21 #endif                                        // defined(*_SANITIZER)
     22 
     23 namespace {
     24 
     25 // Important: JXL_ASSERT does not guarantee running the `condition` code,
     26 // use only for debug mode checks.
     27 
     28 #if JXL_ENABLE_ASSERT
     29 // Exits the program after printing a stack trace when possible.
     30 bool Abort() {
     31 #if defined(ADDRESS_SANITIZER) || defined(MEMORY_SANITIZER) || \
     32     defined(THREAD_SANITIZER)
     33   // If compiled with any sanitizer print a stack trace. This call doesn't crash
     34   // the program, instead the trap below will crash it also allowing gdb to
     35   // break there.
     36   __sanitizer_print_stack_trace();
     37 #endif  // defined(*_SANITIZER)
     38 
     39 #ifdef _MSC_VER
     40   __debugbreak();
     41   abort();
     42 #else
     43   __builtin_trap();
     44 #endif
     45 }
     46 #define JXL_ASSERT(condition) \
     47   do {                        \
     48     if (!(condition)) {       \
     49       Abort();                \
     50     }                         \
     51   } while (0)
     52 #else
     53 #define JXL_ASSERT(condition) \
     54   do {                        \
     55   } while (0)
     56 #endif
     57 }  // namespace
     58 
     59 namespace jpegxl {
     60 
     61 // static
     62 JxlParallelRetCode ThreadParallelRunner::Runner(
     63     void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init,
     64     JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) {
     65   ThreadParallelRunner* self =
     66       static_cast<ThreadParallelRunner*>(runner_opaque);
     67   if (start_range > end_range) return -1;
     68   if (start_range == end_range) return 0;
     69 
     70   int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1));
     71   if (ret != 0) return ret;
     72 
     73   // Use a sequential run when num_worker_threads_ is zero since we have no
     74   // worker threads.
     75   if (self->num_worker_threads_ == 0) {
     76     const size_t thread = 0;
     77     for (uint32_t task = start_range; task < end_range; ++task) {
     78       func(jpegxl_opaque, task, thread);
     79     }
     80     return 0;
     81   }
     82 
     83   if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) {
     84     return -1;  // Must not re-enter.
     85   }
     86 
     87   const WorkerCommand worker_command =
     88       (static_cast<WorkerCommand>(start_range) << 32) + end_range;
     89   // Ensure the inputs do not result in a reserved command.
     90   JXL_ASSERT(worker_command != kWorkerWait);
     91   JXL_ASSERT(worker_command != kWorkerOnce);
     92   JXL_ASSERT(worker_command != kWorkerExit);
     93 
     94   self->data_func_ = func;
     95   self->jpegxl_opaque_ = jpegxl_opaque;
     96   self->num_reserved_.store(0, std::memory_order_relaxed);
     97 
     98   self->StartWorkers(worker_command);
     99   self->WorkersReadyBarrier();
    100 
    101   if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) {
    102     return -1;
    103   }
    104   return 0;
    105 }
    106 
    107 // static
    108 void ThreadParallelRunner::RunRange(ThreadParallelRunner* self,
    109                                     const WorkerCommand command,
    110                                     const int thread) {
    111   const uint32_t begin = command >> 32;
    112   const uint32_t end = command & 0xFFFFFFFF;
    113   const uint32_t num_tasks = end - begin;
    114   const uint32_t num_worker_threads = self->num_worker_threads_;
    115 
    116   // OpenMP introduced several "schedule" strategies:
    117   // "single" (static assignment of exactly one chunk per thread): slower.
    118   // "dynamic" (allocates k tasks at a time): competitive for well-chosen k.
    119   // "guided" (allocates k tasks, decreases k): computing k = remaining/n
    120   //   is faster than halving k each iteration. We prefer this strategy
    121   //   because it avoids user-specified parameters.
    122 
    123   for (;;) {
    124 #if JXL_FALSE
    125     // dynamic
    126     const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1);
    127 #else
    128     // guided
    129     const uint32_t num_reserved =
    130         self->num_reserved_.load(std::memory_order_relaxed);
    131     // It is possible that more tasks are reserved than ready to run.
    132     const uint32_t num_remaining =
    133         num_tasks - std::min(num_reserved, num_tasks);
    134     const uint32_t my_size =
    135         std::max(num_remaining / (num_worker_threads * 4), 1u);
    136 #endif
    137     const uint32_t my_begin = begin + self->num_reserved_.fetch_add(
    138                                           my_size, std::memory_order_relaxed);
    139     const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks);
    140     // Another thread already reserved the last task.
    141     if (my_begin >= my_end) {
    142       break;
    143     }
    144     for (uint32_t task = my_begin; task < my_end; ++task) {
    145       self->data_func_(self->jpegxl_opaque_, task, thread);
    146     }
    147   }
    148 }
    149 
    150 // static
    151 void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self,
    152                                       const int thread) {
    153   // Until kWorkerExit command received:
    154   for (;;) {
    155     std::unique_lock<std::mutex> lock(self->mutex_);
    156     // Notify main thread that this thread is ready.
    157     if (++self->workers_ready_ == self->num_threads_) {
    158       self->workers_ready_cv_.notify_one();
    159     }
    160   RESUME_WAIT:
    161     // Wait for a command.
    162     self->worker_start_cv_.wait(lock);
    163     const WorkerCommand command = self->worker_start_command_;
    164     switch (command) {
    165       case kWorkerWait:    // spurious wakeup:
    166         goto RESUME_WAIT;  // lock still held, avoid incrementing ready.
    167       case kWorkerOnce:
    168         lock.unlock();
    169         self->data_func_(self->jpegxl_opaque_, thread, thread);
    170         break;
    171       case kWorkerExit:
    172         return;  // exits thread
    173       default:
    174         lock.unlock();
    175         RunRange(self, command, thread);
    176         break;
    177     }
    178   }
    179 }
    180 
    181 ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads)
    182     : num_worker_threads_(num_worker_threads),
    183       num_threads_(std::max(num_worker_threads, 1)) {
    184   threads_.reserve(num_worker_threads_);
    185 
    186   // Suppress "unused-private-field" warning.
    187   (void)padding1;
    188   (void)padding2;
    189 
    190   // Safely handle spurious worker wakeups.
    191   worker_start_command_ = kWorkerWait;
    192 
    193   for (uint32_t i = 0; i < num_worker_threads_; ++i) {
    194     threads_.emplace_back(ThreadFunc, this, i);
    195   }
    196 
    197   if (num_worker_threads_ != 0) {
    198     WorkersReadyBarrier();
    199   }
    200 }
    201 
    202 ThreadParallelRunner::~ThreadParallelRunner() {
    203   if (num_worker_threads_ != 0) {
    204     StartWorkers(kWorkerExit);
    205   }
    206 
    207   for (std::thread& thread : threads_) {
    208     JXL_ASSERT(thread.joinable());
    209     thread.join();
    210   }
    211 }
    212 }  // namespace jpegxl