thread_parallel_runner_internal.cc (6621B)
1 // Copyright (c) the JPEG XL Project Authors. All rights reserved. 2 // 3 // Use of this source code is governed by a BSD-style 4 // license that can be found in the LICENSE file. 5 6 #include "lib/threads/thread_parallel_runner_internal.h" 7 8 #include <jxl/parallel_runner.h> 9 #include <jxl/types.h> 10 11 #include <algorithm> 12 #include <atomic> 13 #include <cstddef> 14 #include <cstdint> 15 #include <mutex> 16 #include <thread> 17 18 #if defined(ADDRESS_SANITIZER) || defined(MEMORY_SANITIZER) || \ 19 defined(THREAD_SANITIZER) 20 #include "sanitizer/common_interface_defs.h" // __sanitizer_print_stack_trace 21 #endif // defined(*_SANITIZER) 22 23 namespace { 24 25 // Important: JXL_ASSERT does not guarantee running the `condition` code, 26 // use only for debug mode checks. 27 28 #if JXL_ENABLE_ASSERT 29 // Exits the program after printing a stack trace when possible. 30 bool Abort() { 31 #if defined(ADDRESS_SANITIZER) || defined(MEMORY_SANITIZER) || \ 32 defined(THREAD_SANITIZER) 33 // If compiled with any sanitizer print a stack trace. This call doesn't crash 34 // the program, instead the trap below will crash it also allowing gdb to 35 // break there. 36 __sanitizer_print_stack_trace(); 37 #endif // defined(*_SANITIZER) 38 39 #ifdef _MSC_VER 40 __debugbreak(); 41 abort(); 42 #else 43 __builtin_trap(); 44 #endif 45 } 46 #define JXL_ASSERT(condition) \ 47 do { \ 48 if (!(condition)) { \ 49 Abort(); \ 50 } \ 51 } while (0) 52 #else 53 #define JXL_ASSERT(condition) \ 54 do { \ 55 } while (0) 56 #endif 57 } // namespace 58 59 namespace jpegxl { 60 61 // static 62 JxlParallelRetCode ThreadParallelRunner::Runner( 63 void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init, 64 JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) { 65 ThreadParallelRunner* self = 66 static_cast<ThreadParallelRunner*>(runner_opaque); 67 if (start_range > end_range) return -1; 68 if (start_range == end_range) return 0; 69 70 int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1)); 71 if (ret != 0) return ret; 72 73 // Use a sequential run when num_worker_threads_ is zero since we have no 74 // worker threads. 75 if (self->num_worker_threads_ == 0) { 76 const size_t thread = 0; 77 for (uint32_t task = start_range; task < end_range; ++task) { 78 func(jpegxl_opaque, task, thread); 79 } 80 return 0; 81 } 82 83 if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) { 84 return -1; // Must not re-enter. 85 } 86 87 const WorkerCommand worker_command = 88 (static_cast<WorkerCommand>(start_range) << 32) + end_range; 89 // Ensure the inputs do not result in a reserved command. 90 JXL_ASSERT(worker_command != kWorkerWait); 91 JXL_ASSERT(worker_command != kWorkerOnce); 92 JXL_ASSERT(worker_command != kWorkerExit); 93 94 self->data_func_ = func; 95 self->jpegxl_opaque_ = jpegxl_opaque; 96 self->num_reserved_.store(0, std::memory_order_relaxed); 97 98 self->StartWorkers(worker_command); 99 self->WorkersReadyBarrier(); 100 101 if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) { 102 return -1; 103 } 104 return 0; 105 } 106 107 // static 108 void ThreadParallelRunner::RunRange(ThreadParallelRunner* self, 109 const WorkerCommand command, 110 const int thread) { 111 const uint32_t begin = command >> 32; 112 const uint32_t end = command & 0xFFFFFFFF; 113 const uint32_t num_tasks = end - begin; 114 const uint32_t num_worker_threads = self->num_worker_threads_; 115 116 // OpenMP introduced several "schedule" strategies: 117 // "single" (static assignment of exactly one chunk per thread): slower. 118 // "dynamic" (allocates k tasks at a time): competitive for well-chosen k. 119 // "guided" (allocates k tasks, decreases k): computing k = remaining/n 120 // is faster than halving k each iteration. We prefer this strategy 121 // because it avoids user-specified parameters. 122 123 for (;;) { 124 #if JXL_FALSE 125 // dynamic 126 const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1); 127 #else 128 // guided 129 const uint32_t num_reserved = 130 self->num_reserved_.load(std::memory_order_relaxed); 131 // It is possible that more tasks are reserved than ready to run. 132 const uint32_t num_remaining = 133 num_tasks - std::min(num_reserved, num_tasks); 134 const uint32_t my_size = 135 std::max(num_remaining / (num_worker_threads * 4), 1u); 136 #endif 137 const uint32_t my_begin = begin + self->num_reserved_.fetch_add( 138 my_size, std::memory_order_relaxed); 139 const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks); 140 // Another thread already reserved the last task. 141 if (my_begin >= my_end) { 142 break; 143 } 144 for (uint32_t task = my_begin; task < my_end; ++task) { 145 self->data_func_(self->jpegxl_opaque_, task, thread); 146 } 147 } 148 } 149 150 // static 151 void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self, 152 const int thread) { 153 // Until kWorkerExit command received: 154 for (;;) { 155 std::unique_lock<std::mutex> lock(self->mutex_); 156 // Notify main thread that this thread is ready. 157 if (++self->workers_ready_ == self->num_threads_) { 158 self->workers_ready_cv_.notify_one(); 159 } 160 RESUME_WAIT: 161 // Wait for a command. 162 self->worker_start_cv_.wait(lock); 163 const WorkerCommand command = self->worker_start_command_; 164 switch (command) { 165 case kWorkerWait: // spurious wakeup: 166 goto RESUME_WAIT; // lock still held, avoid incrementing ready. 167 case kWorkerOnce: 168 lock.unlock(); 169 self->data_func_(self->jpegxl_opaque_, thread, thread); 170 break; 171 case kWorkerExit: 172 return; // exits thread 173 default: 174 lock.unlock(); 175 RunRange(self, command, thread); 176 break; 177 } 178 } 179 } 180 181 ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads) 182 : num_worker_threads_(num_worker_threads), 183 num_threads_(std::max(num_worker_threads, 1)) { 184 threads_.reserve(num_worker_threads_); 185 186 // Suppress "unused-private-field" warning. 187 (void)padding1; 188 (void)padding2; 189 190 // Safely handle spurious worker wakeups. 191 worker_start_command_ = kWorkerWait; 192 193 for (uint32_t i = 0; i < num_worker_threads_; ++i) { 194 threads_.emplace_back(ThreadFunc, this, i); 195 } 196 197 if (num_worker_threads_ != 0) { 198 WorkersReadyBarrier(); 199 } 200 } 201 202 ThreadParallelRunner::~ThreadParallelRunner() { 203 if (num_worker_threads_ != 0) { 204 StartWorkers(kWorkerExit); 205 } 206 207 for (std::thread& thread : threads_) { 208 JXL_ASSERT(thread.joinable()); 209 thread.join(); 210 } 211 } 212 } // namespace jpegxl