cdrom_async_reader.cpp (10224B)
1 // SPDX-FileCopyrightText: 2019-2022 Connor McLaughlin <stenzek@gmail.com> 2 // SPDX-License-Identifier: (GPL-3.0 OR CC-BY-NC-ND-4.0) 3 4 #include "cdrom_async_reader.h" 5 #include "common/assert.h" 6 #include "common/log.h" 7 #include "common/timer.h" 8 Log_SetChannel(CDROMAsyncReader); 9 10 CDROMAsyncReader::CDROMAsyncReader() = default; 11 12 CDROMAsyncReader::~CDROMAsyncReader() 13 { 14 StopThread(); 15 } 16 17 void CDROMAsyncReader::StartThread(u32 readahead_count) 18 { 19 if (IsUsingThread()) 20 StopThread(); 21 22 m_buffers.clear(); 23 m_buffers.resize(readahead_count); 24 EmptyBuffers(); 25 26 m_shutdown_flag.store(false); 27 m_read_thread = std::thread(&CDROMAsyncReader::WorkerThreadEntryPoint, this); 28 INFO_LOG("Read thread started with readahead of {} sectors", readahead_count); 29 } 30 31 void CDROMAsyncReader::StopThread() 32 { 33 if (!IsUsingThread()) 34 return; 35 36 { 37 std::unique_lock<std::mutex> lock(m_mutex); 38 m_shutdown_flag.store(true); 39 m_do_read_cv.notify_one(); 40 } 41 42 m_read_thread.join(); 43 EmptyBuffers(); 44 m_buffers.clear(); 45 } 46 47 void CDROMAsyncReader::SetMedia(std::unique_ptr<CDImage> media) 48 { 49 if (IsUsingThread()) 50 CancelReadahead(); 51 52 m_media = std::move(media); 53 } 54 55 std::unique_ptr<CDImage> CDROMAsyncReader::RemoveMedia() 56 { 57 if (IsUsingThread()) 58 CancelReadahead(); 59 60 return std::move(m_media); 61 } 62 63 bool CDROMAsyncReader::Precache(ProgressCallback* callback) 64 { 65 WaitForIdle(); 66 67 std::unique_lock lock(m_mutex); 68 if (!m_media) 69 return false; 70 else if (m_media->IsPrecached()) 71 return true; 72 73 EmptyBuffers(); 74 75 const CDImage::PrecacheResult res = m_media->Precache(callback); 76 if (res == CDImage::PrecacheResult::Unsupported) 77 { 78 // fall back to copy precaching 79 std::unique_ptr<CDImage> memory_image = CDImage::CreateMemoryImage(m_media.get(), callback); 80 if (memory_image) 81 { 82 const CDImage::LBA lba = m_media->GetPositionOnDisc(); 83 if (!memory_image->Seek(lba)) [[unlikely]] 84 { 85 ERROR_LOG("Failed to seek to LBA {} in memory image", lba); 86 return false; 87 } 88 89 m_media.reset(); 90 m_media = std::move(memory_image); 91 return true; 92 } 93 else 94 { 95 return false; 96 } 97 } 98 99 return (res == CDImage::PrecacheResult::Success); 100 } 101 102 void CDROMAsyncReader::QueueReadSector(CDImage::LBA lba) 103 { 104 if (!IsUsingThread()) 105 { 106 ReadSectorNonThreaded(lba); 107 return; 108 } 109 110 const u32 buffer_count = m_buffer_count.load(); 111 if (buffer_count > 0) 112 { 113 // don't re-read the same sector if it was the last one we read 114 // the CDC code does this when seeking->reading 115 const u32 buffer_front = m_buffer_front.load(); 116 if (m_buffers[buffer_front].lba == lba) 117 { 118 DEBUG_LOG("Skipping re-reading same sector {}", lba); 119 return; 120 } 121 122 // did we readahead to the correct sector? 123 const u32 next_buffer = (buffer_front + 1) % static_cast<u32>(m_buffers.size()); 124 if (m_buffer_count > 1 && m_buffers[next_buffer].lba == lba) 125 { 126 // great, don't need a seek, but still kick the thread to start reading ahead again 127 DEBUG_LOG("Readahead buffer hit for sector {}", lba); 128 m_buffer_front.store(next_buffer); 129 m_buffer_count.fetch_sub(1); 130 m_can_readahead.store(true); 131 m_do_read_cv.notify_one(); 132 return; 133 } 134 } 135 136 // we need to toss away our readahead and start fresh 137 DEBUG_LOG("Readahead buffer miss, queueing seek to {}", lba); 138 std::unique_lock<std::mutex> lock(m_mutex); 139 m_next_position_set.store(true); 140 m_next_position = lba; 141 m_do_read_cv.notify_one(); 142 } 143 144 bool CDROMAsyncReader::ReadSectorUncached(CDImage::LBA lba, CDImage::SubChannelQ* subq, SectorBuffer* data) 145 { 146 if (!IsUsingThread()) 147 return InternalReadSectorUncached(lba, subq, data); 148 149 std::unique_lock lock(m_mutex); 150 151 // wait until the read thread is idle 152 m_notify_read_complete_cv.wait(lock, [this]() { return !m_is_reading.load(); }); 153 154 // read while the lock is held so it has to wait 155 const CDImage::LBA prev_lba = m_media->GetPositionOnDisc(); 156 const bool result = InternalReadSectorUncached(lba, subq, data); 157 if (!m_media->Seek(prev_lba)) [[unlikely]] 158 { 159 ERROR_LOG("Failed to re-seek to cached position {}", prev_lba); 160 m_can_readahead.store(false); 161 } 162 163 return result; 164 } 165 166 bool CDROMAsyncReader::InternalReadSectorUncached(CDImage::LBA lba, CDImage::SubChannelQ* subq, SectorBuffer* data) 167 { 168 if (m_media->GetPositionOnDisc() != lba && !m_media->Seek(lba)) [[unlikely]] 169 { 170 WARNING_LOG("Seek to LBA {} failed", lba); 171 return false; 172 } 173 174 if (!m_media->ReadRawSector(data, subq)) [[unlikely]] 175 { 176 WARNING_LOG("Read of LBA {} failed", lba); 177 return false; 178 } 179 180 return true; 181 } 182 183 bool CDROMAsyncReader::WaitForReadToComplete() 184 { 185 // Safe without locking with memory_order_seq_cst. 186 if (!m_next_position_set.load() && m_buffer_count.load() > 0) 187 { 188 TRACE_LOG("Returning sector {}", m_buffers[m_buffer_front.load()].lba); 189 return m_buffers[m_buffer_front.load()].result; 190 } 191 192 Common::Timer wait_timer; 193 DEBUG_LOG("Sector read pending, waiting"); 194 195 std::unique_lock<std::mutex> lock(m_mutex); 196 m_notify_read_complete_cv.wait( 197 lock, [this]() { return (m_buffer_count.load() > 0 || m_seek_error.load()) && !m_next_position_set.load(); }); 198 if (m_seek_error.load()) [[unlikely]] 199 { 200 m_seek_error.store(false); 201 return false; 202 } 203 204 const u32 front = m_buffer_front.load(); 205 const double wait_time = wait_timer.GetTimeMilliseconds(); 206 if (wait_time > 1.0f) [[unlikely]] 207 WARNING_LOG("Had to wait {:.2f} msec for LBA {}", wait_time, m_buffers[front].lba); 208 209 TRACE_LOG("Returning sector {} after waiting", m_buffers[front].lba); 210 return m_buffers[front].result; 211 } 212 213 void CDROMAsyncReader::WaitForIdle() 214 { 215 if (!IsUsingThread()) 216 return; 217 218 std::unique_lock<std::mutex> lock(m_mutex); 219 m_notify_read_complete_cv.wait(lock, [this]() { return (!m_is_reading.load() && !m_next_position_set.load()); }); 220 } 221 222 void CDROMAsyncReader::EmptyBuffers() 223 { 224 m_buffer_front.store(0); 225 m_buffer_back.store(0); 226 m_buffer_count.store(0); 227 } 228 229 bool CDROMAsyncReader::ReadSectorIntoBuffer(std::unique_lock<std::mutex>& lock) 230 { 231 Common::Timer timer; 232 233 const u32 slot = m_buffer_back.load(); 234 m_buffer_back.store((slot + 1) % static_cast<u32>(m_buffers.size())); 235 236 BufferSlot& buffer = m_buffers[slot]; 237 buffer.lba = m_media->GetPositionOnDisc(); 238 m_is_reading.store(true); 239 lock.unlock(); 240 241 TRACE_LOG("Reading LBA {}...", buffer.lba); 242 243 buffer.result = m_media->ReadRawSector(buffer.data.data(), &buffer.subq); 244 if (buffer.result) [[likely]] 245 { 246 const double read_time = timer.GetTimeMilliseconds(); 247 if (read_time > 1.0f) [[unlikely]] 248 DEV_LOG("Read LBA {} took {:.2f} msec", buffer.lba, read_time); 249 } 250 else 251 { 252 ERROR_LOG("Read of LBA {} failed", buffer.lba); 253 } 254 255 lock.lock(); 256 m_is_reading.store(false); 257 m_buffer_count.fetch_add(1); 258 m_notify_read_complete_cv.notify_all(); 259 return true; 260 } 261 262 void CDROMAsyncReader::ReadSectorNonThreaded(CDImage::LBA lba) 263 { 264 Common::Timer timer; 265 266 m_buffers.resize(1); 267 m_seek_error.store(false); 268 EmptyBuffers(); 269 270 if (m_media->GetPositionOnDisc() != lba && !m_media->Seek(lba)) 271 { 272 WARNING_LOG("Seek to LBA {} failed", lba); 273 m_seek_error.store(true); 274 return; 275 } 276 277 BufferSlot& buffer = m_buffers.front(); 278 buffer.lba = m_media->GetPositionOnDisc(); 279 280 TRACE_LOG("Reading LBA {}...", buffer.lba); 281 282 buffer.result = m_media->ReadRawSector(buffer.data.data(), &buffer.subq); 283 if (buffer.result) [[likely]] 284 { 285 const double read_time = timer.GetTimeMilliseconds(); 286 if (read_time > 1.0f) [[unlikely]] 287 DEV_LOG("Read LBA {} took {:.2f} msec", buffer.lba, read_time); 288 } 289 else 290 { 291 ERROR_LOG("Read of LBA {} failed", buffer.lba); 292 } 293 294 m_buffer_count.fetch_add(1); 295 } 296 297 void CDROMAsyncReader::CancelReadahead() 298 { 299 DEV_LOG("Cancelling readahead"); 300 301 std::unique_lock lock(m_mutex); 302 303 // wait until the read thread is idle 304 m_notify_read_complete_cv.wait(lock, [this]() { return !m_is_reading.load(); }); 305 306 // prevent it from doing any more when it re-acquires the lock 307 m_can_readahead.store(false); 308 EmptyBuffers(); 309 } 310 311 void CDROMAsyncReader::WorkerThreadEntryPoint() 312 { 313 std::unique_lock lock(m_mutex); 314 315 for (;;) 316 { 317 m_do_read_cv.wait( 318 lock, [this]() { return (m_shutdown_flag.load() || m_next_position_set.load() || m_can_readahead.load()); }); 319 if (m_shutdown_flag.load()) 320 break; 321 322 for (;;) 323 { 324 if (m_next_position_set.load()) 325 { 326 // discard buffers, we're seeking to a new location 327 const CDImage::LBA seek_location = m_next_position.load(); 328 EmptyBuffers(); 329 m_next_position_set.store(false); 330 m_seek_error.store(false); 331 m_is_reading.store(true); 332 lock.unlock(); 333 334 // seek without lock held in case it takes time 335 DEBUG_LOG("Seeking to LBA {}...", seek_location); 336 const bool seek_result = (m_media->GetPositionOnDisc() == seek_location || m_media->Seek(seek_location)); 337 338 lock.lock(); 339 m_is_reading.store(false); 340 341 // did another request come in? abort if so 342 if (m_next_position_set.load()) 343 continue; 344 345 // did we fail the seek? 346 if (!seek_result) [[unlikely]] 347 { 348 // add the error result, and don't try to read ahead 349 WARNING_LOG("Seek to LBA {} failed", seek_location); 350 m_seek_error.store(true); 351 m_notify_read_complete_cv.notify_all(); 352 break; 353 } 354 355 // go go read ahead! 356 m_can_readahead.store(true); 357 } 358 359 if (!m_can_readahead.load()) 360 break; 361 362 // readahead time! read as many sectors as we have space for 363 DEBUG_LOG("Reading ahead {} sectors...", static_cast<u32>(m_buffers.size()) - m_buffer_count.load()); 364 while (m_buffer_count.load() < static_cast<u32>(m_buffers.size())) 365 { 366 if (m_next_position_set.load()) 367 { 368 // a seek request came in while we're reading, so bail out 369 break; 370 } 371 372 // stop reading if we hit the end or get an error 373 if (!ReadSectorIntoBuffer(lock)) 374 break; 375 } 376 377 // readahead buffer is full or errored at this point 378 m_can_readahead.store(false); 379 break; 380 } 381 } 382 }