qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

job.c (33068B)


      1 /*
      2  * Background jobs (long-running operations)
      3  *
      4  * Copyright (c) 2011 IBM Corp.
      5  * Copyright (c) 2012, 2018 Red Hat, Inc.
      6  *
      7  * Permission is hereby granted, free of charge, to any person obtaining a copy
      8  * of this software and associated documentation files (the "Software"), to deal
      9  * in the Software without restriction, including without limitation the rights
     10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
     11  * copies of the Software, and to permit persons to whom the Software is
     12  * furnished to do so, subject to the following conditions:
     13  *
     14  * The above copyright notice and this permission notice shall be included in
     15  * all copies or substantial portions of the Software.
     16  *
     17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
     20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     23  * THE SOFTWARE.
     24  */
     25 
     26 #include "qemu/osdep.h"
     27 #include "qapi/error.h"
     28 #include "qemu/job.h"
     29 #include "qemu/id.h"
     30 #include "qemu/main-loop.h"
     31 #include "block/aio-wait.h"
     32 #include "trace/trace-root.h"
     33 #include "qapi/qapi-events-job.h"
     34 
     35 /*
     36  * The job API is composed of two categories of functions.
     37  *
     38  * The first includes functions used by the monitor.  The monitor is
     39  * peculiar in that it accesses the job list with job_get, and
     40  * therefore needs consistency across job_get and the actual operation
     41  * (e.g. job_user_cancel). To achieve this consistency, the caller
     42  * calls job_lock/job_unlock itself around the whole operation.
     43  *
     44  *
     45  * The second includes functions used by the job drivers and sometimes
     46  * by the core block layer. These delegate the locking to the callee instead.
     47  */
     48 
     49 /*
     50  * job_mutex protects the jobs list, but also makes the
     51  * struct job fields thread-safe.
     52  */
     53 QemuMutex job_mutex;
     54 
     55 /* Protected by job_mutex */
     56 static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
     57 
     58 /* Job State Transition Table */
     59 bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
     60                                     /* U, C, R, P, Y, S, W, D, X, E, N */
     61     /* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
     62     /* C: */ [JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
     63     /* R: */ [JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
     64     /* P: */ [JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
     65     /* Y: */ [JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
     66     /* S: */ [JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
     67     /* W: */ [JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
     68     /* D: */ [JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
     69     /* X: */ [JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
     70     /* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
     71     /* N: */ [JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
     72 };
     73 
     74 bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
     75                                     /* U, C, R, P, Y, S, W, D, X, E, N */
     76     [JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
     77     [JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
     78     [JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
     79     [JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
     80     [JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
     81     [JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
     82     [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
     83 };
     84 
     85 /* Transactional group of jobs */
     86 struct JobTxn {
     87 
     88     /* Is this txn being cancelled? */
     89     bool aborting;
     90 
     91     /* List of jobs */
     92     QLIST_HEAD(, Job) jobs;
     93 
     94     /* Reference count */
     95     int refcnt;
     96 };
     97 
     98 void job_lock(void)
     99 {
    100     qemu_mutex_lock(&job_mutex);
    101 }
    102 
    103 void job_unlock(void)
    104 {
    105     qemu_mutex_unlock(&job_mutex);
    106 }
    107 
    108 static void __attribute__((__constructor__)) job_init(void)
    109 {
    110     qemu_mutex_init(&job_mutex);
    111 }
    112 
    113 JobTxn *job_txn_new(void)
    114 {
    115     JobTxn *txn = g_new0(JobTxn, 1);
    116     QLIST_INIT(&txn->jobs);
    117     txn->refcnt = 1;
    118     return txn;
    119 }
    120 
    121 /* Called with job_mutex held. */
    122 static void job_txn_ref_locked(JobTxn *txn)
    123 {
    124     txn->refcnt++;
    125 }
    126 
    127 void job_txn_unref_locked(JobTxn *txn)
    128 {
    129     if (txn && --txn->refcnt == 0) {
    130         g_free(txn);
    131     }
    132 }
    133 
    134 void job_txn_unref(JobTxn *txn)
    135 {
    136     JOB_LOCK_GUARD();
    137     job_txn_unref_locked(txn);
    138 }
    139 
    140 /**
    141  * @txn: The transaction (may be NULL)
    142  * @job: Job to add to the transaction
    143  *
    144  * Add @job to the transaction.  The @job must not already be in a transaction.
    145  * The caller must call either job_txn_unref() or job_completed() to release
    146  * the reference that is automatically grabbed here.
    147  *
    148  * If @txn is NULL, the function does nothing.
    149  *
    150  * Called with job_mutex held.
    151  */
    152 static void job_txn_add_job_locked(JobTxn *txn, Job *job)
    153 {
    154     if (!txn) {
    155         return;
    156     }
    157 
    158     assert(!job->txn);
    159     job->txn = txn;
    160 
    161     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
    162     job_txn_ref_locked(txn);
    163 }
    164 
    165 /* Called with job_mutex held. */
    166 static void job_txn_del_job_locked(Job *job)
    167 {
    168     if (job->txn) {
    169         QLIST_REMOVE(job, txn_list);
    170         job_txn_unref_locked(job->txn);
    171         job->txn = NULL;
    172     }
    173 }
    174 
    175 /* Called with job_mutex held, but releases it temporarily. */
    176 static int job_txn_apply_locked(Job *job, int fn(Job *))
    177 {
    178     Job *other_job, *next;
    179     JobTxn *txn = job->txn;
    180     int rc = 0;
    181 
    182     /*
    183      * Similar to job_completed_txn_abort, we take each job's lock before
    184      * applying fn, but since we assume that outer_ctx is held by the caller,
    185      * we need to release it here to avoid holding the lock twice - which would
    186      * break AIO_WAIT_WHILE from within fn.
    187      */
    188     job_ref_locked(job);
    189 
    190     QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
    191         rc = fn(other_job);
    192         if (rc) {
    193             break;
    194         }
    195     }
    196 
    197     job_unref_locked(job);
    198     return rc;
    199 }
    200 
    201 bool job_is_internal(Job *job)
    202 {
    203     return (job->id == NULL);
    204 }
    205 
    206 /* Called with job_mutex held. */
    207 static void job_state_transition_locked(Job *job, JobStatus s1)
    208 {
    209     JobStatus s0 = job->status;
    210     assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
    211     trace_job_state_transition(job, job->ret,
    212                                JobSTT[s0][s1] ? "allowed" : "disallowed",
    213                                JobStatus_str(s0), JobStatus_str(s1));
    214     assert(JobSTT[s0][s1]);
    215     job->status = s1;
    216 
    217     if (!job_is_internal(job) && s1 != s0) {
    218         qapi_event_send_job_status_change(job->id, job->status);
    219     }
    220 }
    221 
    222 int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp)
    223 {
    224     JobStatus s0 = job->status;
    225     assert(verb >= 0 && verb < JOB_VERB__MAX);
    226     trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
    227                          JobVerbTable[verb][s0] ? "allowed" : "prohibited");
    228     if (JobVerbTable[verb][s0]) {
    229         return 0;
    230     }
    231     error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
    232                job->id, JobStatus_str(s0), JobVerb_str(verb));
    233     return -EPERM;
    234 }
    235 
    236 JobType job_type(const Job *job)
    237 {
    238     return job->driver->job_type;
    239 }
    240 
    241 const char *job_type_str(const Job *job)
    242 {
    243     return JobType_str(job_type(job));
    244 }
    245 
    246 bool job_is_cancelled_locked(Job *job)
    247 {
    248     /* force_cancel may be true only if cancelled is true, too */
    249     assert(job->cancelled || !job->force_cancel);
    250     return job->force_cancel;
    251 }
    252 
    253 bool job_is_cancelled(Job *job)
    254 {
    255     JOB_LOCK_GUARD();
    256     return job_is_cancelled_locked(job);
    257 }
    258 
    259 /* Called with job_mutex held. */
    260 static bool job_cancel_requested_locked(Job *job)
    261 {
    262     return job->cancelled;
    263 }
    264 
    265 bool job_cancel_requested(Job *job)
    266 {
    267     JOB_LOCK_GUARD();
    268     return job_cancel_requested_locked(job);
    269 }
    270 
    271 bool job_is_ready_locked(Job *job)
    272 {
    273     switch (job->status) {
    274     case JOB_STATUS_UNDEFINED:
    275     case JOB_STATUS_CREATED:
    276     case JOB_STATUS_RUNNING:
    277     case JOB_STATUS_PAUSED:
    278     case JOB_STATUS_WAITING:
    279     case JOB_STATUS_PENDING:
    280     case JOB_STATUS_ABORTING:
    281     case JOB_STATUS_CONCLUDED:
    282     case JOB_STATUS_NULL:
    283         return false;
    284     case JOB_STATUS_READY:
    285     case JOB_STATUS_STANDBY:
    286         return true;
    287     default:
    288         g_assert_not_reached();
    289     }
    290     return false;
    291 }
    292 
    293 bool job_is_ready(Job *job)
    294 {
    295     JOB_LOCK_GUARD();
    296     return job_is_ready_locked(job);
    297 }
    298 
    299 bool job_is_completed_locked(Job *job)
    300 {
    301     switch (job->status) {
    302     case JOB_STATUS_UNDEFINED:
    303     case JOB_STATUS_CREATED:
    304     case JOB_STATUS_RUNNING:
    305     case JOB_STATUS_PAUSED:
    306     case JOB_STATUS_READY:
    307     case JOB_STATUS_STANDBY:
    308         return false;
    309     case JOB_STATUS_WAITING:
    310     case JOB_STATUS_PENDING:
    311     case JOB_STATUS_ABORTING:
    312     case JOB_STATUS_CONCLUDED:
    313     case JOB_STATUS_NULL:
    314         return true;
    315     default:
    316         g_assert_not_reached();
    317     }
    318     return false;
    319 }
    320 
    321 static bool job_is_completed(Job *job)
    322 {
    323     JOB_LOCK_GUARD();
    324     return job_is_completed_locked(job);
    325 }
    326 
    327 static bool job_started_locked(Job *job)
    328 {
    329     return job->co;
    330 }
    331 
    332 /* Called with job_mutex held. */
    333 static bool job_should_pause_locked(Job *job)
    334 {
    335     return job->pause_count > 0;
    336 }
    337 
    338 Job *job_next_locked(Job *job)
    339 {
    340     if (!job) {
    341         return QLIST_FIRST(&jobs);
    342     }
    343     return QLIST_NEXT(job, job_list);
    344 }
    345 
    346 Job *job_next(Job *job)
    347 {
    348     JOB_LOCK_GUARD();
    349     return job_next_locked(job);
    350 }
    351 
    352 Job *job_get_locked(const char *id)
    353 {
    354     Job *job;
    355 
    356     QLIST_FOREACH(job, &jobs, job_list) {
    357         if (job->id && !strcmp(id, job->id)) {
    358             return job;
    359         }
    360     }
    361 
    362     return NULL;
    363 }
    364 
    365 void job_set_aio_context(Job *job, AioContext *ctx)
    366 {
    367     /* protect against read in job_finish_sync_locked and job_start */
    368     GLOBAL_STATE_CODE();
    369     /* protect against read in job_do_yield_locked */
    370     JOB_LOCK_GUARD();
    371     /* ensure the job is quiescent while the AioContext is changed */
    372     assert(job->paused || job_is_completed_locked(job));
    373     job->aio_context = ctx;
    374 }
    375 
    376 /* Called with job_mutex *not* held. */
    377 static void job_sleep_timer_cb(void *opaque)
    378 {
    379     Job *job = opaque;
    380 
    381     job_enter(job);
    382 }
    383 
    384 void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
    385                  AioContext *ctx, int flags, BlockCompletionFunc *cb,
    386                  void *opaque, Error **errp)
    387 {
    388     Job *job;
    389 
    390     JOB_LOCK_GUARD();
    391 
    392     if (job_id) {
    393         if (flags & JOB_INTERNAL) {
    394             error_setg(errp, "Cannot specify job ID for internal job");
    395             return NULL;
    396         }
    397         if (!id_wellformed(job_id)) {
    398             error_setg(errp, "Invalid job ID '%s'", job_id);
    399             return NULL;
    400         }
    401         if (job_get_locked(job_id)) {
    402             error_setg(errp, "Job ID '%s' already in use", job_id);
    403             return NULL;
    404         }
    405     } else if (!(flags & JOB_INTERNAL)) {
    406         error_setg(errp, "An explicit job ID is required");
    407         return NULL;
    408     }
    409 
    410     job = g_malloc0(driver->instance_size);
    411     job->driver        = driver;
    412     job->id            = g_strdup(job_id);
    413     job->refcnt        = 1;
    414     job->aio_context   = ctx;
    415     job->busy          = false;
    416     job->paused        = true;
    417     job->pause_count   = 1;
    418     job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
    419     job->auto_dismiss  = !(flags & JOB_MANUAL_DISMISS);
    420     job->cb            = cb;
    421     job->opaque        = opaque;
    422 
    423     progress_init(&job->progress);
    424 
    425     notifier_list_init(&job->on_finalize_cancelled);
    426     notifier_list_init(&job->on_finalize_completed);
    427     notifier_list_init(&job->on_pending);
    428     notifier_list_init(&job->on_ready);
    429     notifier_list_init(&job->on_idle);
    430 
    431     job_state_transition_locked(job, JOB_STATUS_CREATED);
    432     aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
    433                    QEMU_CLOCK_REALTIME, SCALE_NS,
    434                    job_sleep_timer_cb, job);
    435 
    436     QLIST_INSERT_HEAD(&jobs, job, job_list);
    437 
    438     /* Single jobs are modeled as single-job transactions for sake of
    439      * consolidating the job management logic */
    440     if (!txn) {
    441         txn = job_txn_new();
    442         job_txn_add_job_locked(txn, job);
    443         job_txn_unref_locked(txn);
    444     } else {
    445         job_txn_add_job_locked(txn, job);
    446     }
    447 
    448     return job;
    449 }
    450 
    451 void job_ref_locked(Job *job)
    452 {
    453     ++job->refcnt;
    454 }
    455 
    456 void job_unref_locked(Job *job)
    457 {
    458     GLOBAL_STATE_CODE();
    459 
    460     if (--job->refcnt == 0) {
    461         assert(job->status == JOB_STATUS_NULL);
    462         assert(!timer_pending(&job->sleep_timer));
    463         assert(!job->txn);
    464 
    465         if (job->driver->free) {
    466             AioContext *aio_context = job->aio_context;
    467             job_unlock();
    468             /* FIXME: aiocontext lock is required because cb calls blk_unref */
    469             aio_context_acquire(aio_context);
    470             job->driver->free(job);
    471             aio_context_release(aio_context);
    472             job_lock();
    473         }
    474 
    475         QLIST_REMOVE(job, job_list);
    476 
    477         progress_destroy(&job->progress);
    478         error_free(job->err);
    479         g_free(job->id);
    480         g_free(job);
    481     }
    482 }
    483 
    484 void job_progress_update(Job *job, uint64_t done)
    485 {
    486     progress_work_done(&job->progress, done);
    487 }
    488 
    489 void job_progress_set_remaining(Job *job, uint64_t remaining)
    490 {
    491     progress_set_remaining(&job->progress, remaining);
    492 }
    493 
    494 void job_progress_increase_remaining(Job *job, uint64_t delta)
    495 {
    496     progress_increase_remaining(&job->progress, delta);
    497 }
    498 
    499 /**
    500  * To be called when a cancelled job is finalised.
    501  * Called with job_mutex held.
    502  */
    503 static void job_event_cancelled_locked(Job *job)
    504 {
    505     notifier_list_notify(&job->on_finalize_cancelled, job);
    506 }
    507 
    508 /**
    509  * To be called when a successfully completed job is finalised.
    510  * Called with job_mutex held.
    511  */
    512 static void job_event_completed_locked(Job *job)
    513 {
    514     notifier_list_notify(&job->on_finalize_completed, job);
    515 }
    516 
    517 /* Called with job_mutex held. */
    518 static void job_event_pending_locked(Job *job)
    519 {
    520     notifier_list_notify(&job->on_pending, job);
    521 }
    522 
    523 /* Called with job_mutex held. */
    524 static void job_event_ready_locked(Job *job)
    525 {
    526     notifier_list_notify(&job->on_ready, job);
    527 }
    528 
    529 /* Called with job_mutex held. */
    530 static void job_event_idle_locked(Job *job)
    531 {
    532     notifier_list_notify(&job->on_idle, job);
    533 }
    534 
    535 void job_enter_cond_locked(Job *job, bool(*fn)(Job *job))
    536 {
    537     if (!job_started_locked(job)) {
    538         return;
    539     }
    540     if (job->deferred_to_main_loop) {
    541         return;
    542     }
    543 
    544     if (job->busy) {
    545         return;
    546     }
    547 
    548     if (fn && !fn(job)) {
    549         return;
    550     }
    551 
    552     assert(!job->deferred_to_main_loop);
    553     timer_del(&job->sleep_timer);
    554     job->busy = true;
    555     job_unlock();
    556     aio_co_wake(job->co);
    557     job_lock();
    558 }
    559 
    560 void job_enter(Job *job)
    561 {
    562     JOB_LOCK_GUARD();
    563     job_enter_cond_locked(job, NULL);
    564 }
    565 
    566 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
    567  * Reentering the job coroutine with job_enter() before the timer has expired
    568  * is allowed and cancels the timer.
    569  *
    570  * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
    571  * called explicitly.
    572  *
    573  * Called with job_mutex held, but releases it temporarily.
    574  */
    575 static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
    576 {
    577     AioContext *next_aio_context;
    578 
    579     if (ns != -1) {
    580         timer_mod(&job->sleep_timer, ns);
    581     }
    582     job->busy = false;
    583     job_event_idle_locked(job);
    584     job_unlock();
    585     qemu_coroutine_yield();
    586     job_lock();
    587 
    588     next_aio_context = job->aio_context;
    589     /*
    590      * Coroutine has resumed, but in the meanwhile the job AioContext
    591      * might have changed via bdrv_try_change_aio_context(), so we need to move
    592      * the coroutine too in the new aiocontext.
    593      */
    594     while (qemu_get_current_aio_context() != next_aio_context) {
    595         job_unlock();
    596         aio_co_reschedule_self(next_aio_context);
    597         job_lock();
    598         next_aio_context = job->aio_context;
    599     }
    600 
    601     /* Set by job_enter_cond_locked() before re-entering the coroutine.  */
    602     assert(job->busy);
    603 }
    604 
    605 /* Called with job_mutex held, but releases it temporarily. */
    606 static void coroutine_fn job_pause_point_locked(Job *job)
    607 {
    608     assert(job && job_started_locked(job));
    609 
    610     if (!job_should_pause_locked(job)) {
    611         return;
    612     }
    613     if (job_is_cancelled_locked(job)) {
    614         return;
    615     }
    616 
    617     if (job->driver->pause) {
    618         job_unlock();
    619         job->driver->pause(job);
    620         job_lock();
    621     }
    622 
    623     if (job_should_pause_locked(job) && !job_is_cancelled_locked(job)) {
    624         JobStatus status = job->status;
    625         job_state_transition_locked(job, status == JOB_STATUS_READY
    626                                     ? JOB_STATUS_STANDBY
    627                                     : JOB_STATUS_PAUSED);
    628         job->paused = true;
    629         job_do_yield_locked(job, -1);
    630         job->paused = false;
    631         job_state_transition_locked(job, status);
    632     }
    633 
    634     if (job->driver->resume) {
    635         job_unlock();
    636         job->driver->resume(job);
    637         job_lock();
    638     }
    639 }
    640 
    641 void coroutine_fn job_pause_point(Job *job)
    642 {
    643     JOB_LOCK_GUARD();
    644     job_pause_point_locked(job);
    645 }
    646 
    647 void coroutine_fn job_yield(Job *job)
    648 {
    649     JOB_LOCK_GUARD();
    650     assert(job->busy);
    651 
    652     /* Check cancellation *before* setting busy = false, too!  */
    653     if (job_is_cancelled_locked(job)) {
    654         return;
    655     }
    656 
    657     if (!job_should_pause_locked(job)) {
    658         job_do_yield_locked(job, -1);
    659     }
    660 
    661     job_pause_point_locked(job);
    662 }
    663 
    664 void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
    665 {
    666     JOB_LOCK_GUARD();
    667     assert(job->busy);
    668 
    669     /* Check cancellation *before* setting busy = false, too!  */
    670     if (job_is_cancelled_locked(job)) {
    671         return;
    672     }
    673 
    674     if (!job_should_pause_locked(job)) {
    675         job_do_yield_locked(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
    676     }
    677 
    678     job_pause_point_locked(job);
    679 }
    680 
    681 /* Assumes the job_mutex is held */
    682 static bool job_timer_not_pending_locked(Job *job)
    683 {
    684     return !timer_pending(&job->sleep_timer);
    685 }
    686 
    687 void job_pause_locked(Job *job)
    688 {
    689     job->pause_count++;
    690     if (!job->paused) {
    691         job_enter_cond_locked(job, NULL);
    692     }
    693 }
    694 
    695 void job_pause(Job *job)
    696 {
    697     JOB_LOCK_GUARD();
    698     job_pause_locked(job);
    699 }
    700 
    701 void job_resume_locked(Job *job)
    702 {
    703     assert(job->pause_count > 0);
    704     job->pause_count--;
    705     if (job->pause_count) {
    706         return;
    707     }
    708 
    709     /* kick only if no timer is pending */
    710     job_enter_cond_locked(job, job_timer_not_pending_locked);
    711 }
    712 
    713 void job_resume(Job *job)
    714 {
    715     JOB_LOCK_GUARD();
    716     job_resume_locked(job);
    717 }
    718 
    719 void job_user_pause_locked(Job *job, Error **errp)
    720 {
    721     if (job_apply_verb_locked(job, JOB_VERB_PAUSE, errp)) {
    722         return;
    723     }
    724     if (job->user_paused) {
    725         error_setg(errp, "Job is already paused");
    726         return;
    727     }
    728     job->user_paused = true;
    729     job_pause_locked(job);
    730 }
    731 
    732 bool job_user_paused_locked(Job *job)
    733 {
    734     return job->user_paused;
    735 }
    736 
    737 void job_user_resume_locked(Job *job, Error **errp)
    738 {
    739     assert(job);
    740     GLOBAL_STATE_CODE();
    741     if (!job->user_paused || job->pause_count <= 0) {
    742         error_setg(errp, "Can't resume a job that was not paused");
    743         return;
    744     }
    745     if (job_apply_verb_locked(job, JOB_VERB_RESUME, errp)) {
    746         return;
    747     }
    748     if (job->driver->user_resume) {
    749         job_unlock();
    750         job->driver->user_resume(job);
    751         job_lock();
    752     }
    753     job->user_paused = false;
    754     job_resume_locked(job);
    755 }
    756 
    757 /* Called with job_mutex held, but releases it temporarily. */
    758 static void job_do_dismiss_locked(Job *job)
    759 {
    760     assert(job);
    761     job->busy = false;
    762     job->paused = false;
    763     job->deferred_to_main_loop = true;
    764 
    765     job_txn_del_job_locked(job);
    766 
    767     job_state_transition_locked(job, JOB_STATUS_NULL);
    768     job_unref_locked(job);
    769 }
    770 
    771 void job_dismiss_locked(Job **jobptr, Error **errp)
    772 {
    773     Job *job = *jobptr;
    774     /* similarly to _complete, this is QMP-interface only. */
    775     assert(job->id);
    776     if (job_apply_verb_locked(job, JOB_VERB_DISMISS, errp)) {
    777         return;
    778     }
    779 
    780     job_do_dismiss_locked(job);
    781     *jobptr = NULL;
    782 }
    783 
    784 void job_early_fail(Job *job)
    785 {
    786     JOB_LOCK_GUARD();
    787     assert(job->status == JOB_STATUS_CREATED);
    788     job_do_dismiss_locked(job);
    789 }
    790 
    791 /* Called with job_mutex held. */
    792 static void job_conclude_locked(Job *job)
    793 {
    794     job_state_transition_locked(job, JOB_STATUS_CONCLUDED);
    795     if (job->auto_dismiss || !job_started_locked(job)) {
    796         job_do_dismiss_locked(job);
    797     }
    798 }
    799 
    800 /* Called with job_mutex held. */
    801 static void job_update_rc_locked(Job *job)
    802 {
    803     if (!job->ret && job_is_cancelled_locked(job)) {
    804         job->ret = -ECANCELED;
    805     }
    806     if (job->ret) {
    807         if (!job->err) {
    808             error_setg(&job->err, "%s", strerror(-job->ret));
    809         }
    810         job_state_transition_locked(job, JOB_STATUS_ABORTING);
    811     }
    812 }
    813 
    814 static void job_commit(Job *job)
    815 {
    816     assert(!job->ret);
    817     GLOBAL_STATE_CODE();
    818     if (job->driver->commit) {
    819         job->driver->commit(job);
    820     }
    821 }
    822 
    823 static void job_abort(Job *job)
    824 {
    825     assert(job->ret);
    826     GLOBAL_STATE_CODE();
    827     if (job->driver->abort) {
    828         job->driver->abort(job);
    829     }
    830 }
    831 
    832 static void job_clean(Job *job)
    833 {
    834     GLOBAL_STATE_CODE();
    835     if (job->driver->clean) {
    836         job->driver->clean(job);
    837     }
    838 }
    839 
    840 /*
    841  * Called with job_mutex held, but releases it temporarily.
    842  * Takes AioContext lock internally to invoke a job->driver callback.
    843  */
    844 static int job_finalize_single_locked(Job *job)
    845 {
    846     int job_ret;
    847     AioContext *ctx = job->aio_context;
    848 
    849     assert(job_is_completed_locked(job));
    850 
    851     /* Ensure abort is called for late-transactional failures */
    852     job_update_rc_locked(job);
    853 
    854     job_ret = job->ret;
    855     job_unlock();
    856     aio_context_acquire(ctx);
    857 
    858     if (!job_ret) {
    859         job_commit(job);
    860     } else {
    861         job_abort(job);
    862     }
    863     job_clean(job);
    864 
    865     if (job->cb) {
    866         job->cb(job->opaque, job_ret);
    867     }
    868 
    869     aio_context_release(ctx);
    870     job_lock();
    871 
    872     /* Emit events only if we actually started */
    873     if (job_started_locked(job)) {
    874         if (job_is_cancelled_locked(job)) {
    875             job_event_cancelled_locked(job);
    876         } else {
    877             job_event_completed_locked(job);
    878         }
    879     }
    880 
    881     job_txn_del_job_locked(job);
    882     job_conclude_locked(job);
    883     return 0;
    884 }
    885 
    886 /*
    887  * Called with job_mutex held, but releases it temporarily.
    888  * Takes AioContext lock internally to invoke a job->driver callback.
    889  */
    890 static void job_cancel_async_locked(Job *job, bool force)
    891 {
    892     AioContext *ctx = job->aio_context;
    893     GLOBAL_STATE_CODE();
    894     if (job->driver->cancel) {
    895         job_unlock();
    896         aio_context_acquire(ctx);
    897         force = job->driver->cancel(job, force);
    898         aio_context_release(ctx);
    899         job_lock();
    900     } else {
    901         /* No .cancel() means the job will behave as if force-cancelled */
    902         force = true;
    903     }
    904 
    905     if (job->user_paused) {
    906         /* Do not call job_enter here, the caller will handle it.  */
    907         if (job->driver->user_resume) {
    908             job_unlock();
    909             job->driver->user_resume(job);
    910             job_lock();
    911         }
    912         job->user_paused = false;
    913         assert(job->pause_count > 0);
    914         job->pause_count--;
    915     }
    916 
    917     /*
    918      * Ignore soft cancel requests after the job is already done
    919      * (We will still invoke job->driver->cancel() above, but if the
    920      * job driver supports soft cancelling and the job is done, that
    921      * should be a no-op, too.  We still call it so it can override
    922      * @force.)
    923      */
    924     if (force || !job->deferred_to_main_loop) {
    925         job->cancelled = true;
    926         /* To prevent 'force == false' overriding a previous 'force == true' */
    927         job->force_cancel |= force;
    928     }
    929 }
    930 
    931 /*
    932  * Called with job_mutex held, but releases it temporarily.
    933  * Takes AioContext lock internally to invoke a job->driver callback.
    934  */
    935 static void job_completed_txn_abort_locked(Job *job)
    936 {
    937     JobTxn *txn = job->txn;
    938     Job *other_job;
    939 
    940     if (txn->aborting) {
    941         /*
    942          * We are cancelled by another job, which will handle everything.
    943          */
    944         return;
    945     }
    946     txn->aborting = true;
    947     job_txn_ref_locked(txn);
    948 
    949     job_ref_locked(job);
    950 
    951     /* Other jobs are effectively cancelled by us, set the status for
    952      * them; this job, however, may or may not be cancelled, depending
    953      * on the caller, so leave it. */
    954     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
    955         if (other_job != job) {
    956             /*
    957              * This is a transaction: If one job failed, no result will matter.
    958              * Therefore, pass force=true to terminate all other jobs as quickly
    959              * as possible.
    960              */
    961             job_cancel_async_locked(other_job, true);
    962         }
    963     }
    964     while (!QLIST_EMPTY(&txn->jobs)) {
    965         other_job = QLIST_FIRST(&txn->jobs);
    966         if (!job_is_completed_locked(other_job)) {
    967             assert(job_cancel_requested_locked(other_job));
    968             job_finish_sync_locked(other_job, NULL, NULL);
    969         }
    970         job_finalize_single_locked(other_job);
    971     }
    972 
    973     job_unref_locked(job);
    974     job_txn_unref_locked(txn);
    975 }
    976 
    977 /* Called with job_mutex held, but releases it temporarily */
    978 static int job_prepare_locked(Job *job)
    979 {
    980     int ret;
    981     AioContext *ctx = job->aio_context;
    982 
    983     GLOBAL_STATE_CODE();
    984 
    985     if (job->ret == 0 && job->driver->prepare) {
    986         job_unlock();
    987         aio_context_acquire(ctx);
    988         ret = job->driver->prepare(job);
    989         aio_context_release(ctx);
    990         job_lock();
    991         job->ret = ret;
    992         job_update_rc_locked(job);
    993     }
    994 
    995     return job->ret;
    996 }
    997 
    998 /* Called with job_mutex held */
    999 static int job_needs_finalize_locked(Job *job)
   1000 {
   1001     return !job->auto_finalize;
   1002 }
   1003 
   1004 /* Called with job_mutex held */
   1005 static void job_do_finalize_locked(Job *job)
   1006 {
   1007     int rc;
   1008     assert(job && job->txn);
   1009 
   1010     /* prepare the transaction to complete */
   1011     rc = job_txn_apply_locked(job, job_prepare_locked);
   1012     if (rc) {
   1013         job_completed_txn_abort_locked(job);
   1014     } else {
   1015         job_txn_apply_locked(job, job_finalize_single_locked);
   1016     }
   1017 }
   1018 
   1019 void job_finalize_locked(Job *job, Error **errp)
   1020 {
   1021     assert(job && job->id);
   1022     if (job_apply_verb_locked(job, JOB_VERB_FINALIZE, errp)) {
   1023         return;
   1024     }
   1025     job_do_finalize_locked(job);
   1026 }
   1027 
   1028 /* Called with job_mutex held. */
   1029 static int job_transition_to_pending_locked(Job *job)
   1030 {
   1031     job_state_transition_locked(job, JOB_STATUS_PENDING);
   1032     if (!job->auto_finalize) {
   1033         job_event_pending_locked(job);
   1034     }
   1035     return 0;
   1036 }
   1037 
   1038 void job_transition_to_ready(Job *job)
   1039 {
   1040     JOB_LOCK_GUARD();
   1041     job_state_transition_locked(job, JOB_STATUS_READY);
   1042     job_event_ready_locked(job);
   1043 }
   1044 
   1045 /* Called with job_mutex held. */
   1046 static void job_completed_txn_success_locked(Job *job)
   1047 {
   1048     JobTxn *txn = job->txn;
   1049     Job *other_job;
   1050 
   1051     job_state_transition_locked(job, JOB_STATUS_WAITING);
   1052 
   1053     /*
   1054      * Successful completion, see if there are other running jobs in this
   1055      * txn.
   1056      */
   1057     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
   1058         if (!job_is_completed_locked(other_job)) {
   1059             return;
   1060         }
   1061         assert(other_job->ret == 0);
   1062     }
   1063 
   1064     job_txn_apply_locked(job, job_transition_to_pending_locked);
   1065 
   1066     /* If no jobs need manual finalization, automatically do so */
   1067     if (job_txn_apply_locked(job, job_needs_finalize_locked) == 0) {
   1068         job_do_finalize_locked(job);
   1069     }
   1070 }
   1071 
   1072 /* Called with job_mutex held. */
   1073 static void job_completed_locked(Job *job)
   1074 {
   1075     assert(job && job->txn && !job_is_completed_locked(job));
   1076 
   1077     job_update_rc_locked(job);
   1078     trace_job_completed(job, job->ret);
   1079     if (job->ret) {
   1080         job_completed_txn_abort_locked(job);
   1081     } else {
   1082         job_completed_txn_success_locked(job);
   1083     }
   1084 }
   1085 
   1086 /**
   1087  * Useful only as a type shim for aio_bh_schedule_oneshot.
   1088  * Called with job_mutex *not* held.
   1089  */
   1090 static void job_exit(void *opaque)
   1091 {
   1092     Job *job = (Job *)opaque;
   1093     JOB_LOCK_GUARD();
   1094     job_ref_locked(job);
   1095 
   1096     /* This is a lie, we're not quiescent, but still doing the completion
   1097      * callbacks. However, completion callbacks tend to involve operations that
   1098      * drain block nodes, and if .drained_poll still returned true, we would
   1099      * deadlock. */
   1100     job->busy = false;
   1101     job_event_idle_locked(job);
   1102 
   1103     job_completed_locked(job);
   1104     job_unref_locked(job);
   1105 }
   1106 
   1107 /**
   1108  * All jobs must allow a pause point before entering their job proper. This
   1109  * ensures that jobs can be paused prior to being started, then resumed later.
   1110  */
   1111 static void coroutine_fn job_co_entry(void *opaque)
   1112 {
   1113     Job *job = opaque;
   1114     int ret;
   1115 
   1116     assert(job && job->driver && job->driver->run);
   1117     WITH_JOB_LOCK_GUARD() {
   1118         assert(job->aio_context == qemu_get_current_aio_context());
   1119         job_pause_point_locked(job);
   1120     }
   1121     ret = job->driver->run(job, &job->err);
   1122     WITH_JOB_LOCK_GUARD() {
   1123         job->ret = ret;
   1124         job->deferred_to_main_loop = true;
   1125         job->busy = true;
   1126     }
   1127     aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
   1128 }
   1129 
   1130 void job_start(Job *job)
   1131 {
   1132     assert(qemu_in_main_thread());
   1133 
   1134     WITH_JOB_LOCK_GUARD() {
   1135         assert(job && !job_started_locked(job) && job->paused &&
   1136             job->driver && job->driver->run);
   1137         job->co = qemu_coroutine_create(job_co_entry, job);
   1138         job->pause_count--;
   1139         job->busy = true;
   1140         job->paused = false;
   1141         job_state_transition_locked(job, JOB_STATUS_RUNNING);
   1142     }
   1143     aio_co_enter(job->aio_context, job->co);
   1144 }
   1145 
   1146 void job_cancel_locked(Job *job, bool force)
   1147 {
   1148     if (job->status == JOB_STATUS_CONCLUDED) {
   1149         job_do_dismiss_locked(job);
   1150         return;
   1151     }
   1152     job_cancel_async_locked(job, force);
   1153     if (!job_started_locked(job)) {
   1154         job_completed_locked(job);
   1155     } else if (job->deferred_to_main_loop) {
   1156         /*
   1157          * job_cancel_async() ignores soft-cancel requests for jobs
   1158          * that are already done (i.e. deferred to the main loop).  We
   1159          * have to check again whether the job is really cancelled.
   1160          * (job_cancel_requested() and job_is_cancelled() are equivalent
   1161          * here, because job_cancel_async() will make soft-cancel
   1162          * requests no-ops when deferred_to_main_loop is true.  We
   1163          * choose to call job_is_cancelled() to show that we invoke
   1164          * job_completed_txn_abort() only for force-cancelled jobs.)
   1165          */
   1166         if (job_is_cancelled_locked(job)) {
   1167             job_completed_txn_abort_locked(job);
   1168         }
   1169     } else {
   1170         job_enter_cond_locked(job, NULL);
   1171     }
   1172 }
   1173 
   1174 void job_user_cancel_locked(Job *job, bool force, Error **errp)
   1175 {
   1176     if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
   1177         return;
   1178     }
   1179     job_cancel_locked(job, force);
   1180 }
   1181 
   1182 /* A wrapper around job_cancel_locked() taking an Error ** parameter so it may
   1183  * be used with job_finish_sync_locked() without the need for (rather nasty)
   1184  * function pointer casts there.
   1185  *
   1186  * Called with job_mutex held.
   1187  */
   1188 static void job_cancel_err_locked(Job *job, Error **errp)
   1189 {
   1190     job_cancel_locked(job, false);
   1191 }
   1192 
   1193 /**
   1194  * Same as job_cancel_err(), but force-cancel.
   1195  * Called with job_mutex held.
   1196  */
   1197 static void job_force_cancel_err_locked(Job *job, Error **errp)
   1198 {
   1199     job_cancel_locked(job, true);
   1200 }
   1201 
   1202 int job_cancel_sync_locked(Job *job, bool force)
   1203 {
   1204     if (force) {
   1205         return job_finish_sync_locked(job, &job_force_cancel_err_locked, NULL);
   1206     } else {
   1207         return job_finish_sync_locked(job, &job_cancel_err_locked, NULL);
   1208     }
   1209 }
   1210 
   1211 int job_cancel_sync(Job *job, bool force)
   1212 {
   1213     JOB_LOCK_GUARD();
   1214     return job_cancel_sync_locked(job, force);
   1215 }
   1216 
   1217 void job_cancel_sync_all(void)
   1218 {
   1219     Job *job;
   1220     JOB_LOCK_GUARD();
   1221 
   1222     while ((job = job_next_locked(NULL))) {
   1223         job_cancel_sync_locked(job, true);
   1224     }
   1225 }
   1226 
   1227 int job_complete_sync_locked(Job *job, Error **errp)
   1228 {
   1229     return job_finish_sync_locked(job, job_complete_locked, errp);
   1230 }
   1231 
   1232 void job_complete_locked(Job *job, Error **errp)
   1233 {
   1234     /* Should not be reachable via external interface for internal jobs */
   1235     assert(job->id);
   1236     GLOBAL_STATE_CODE();
   1237     if (job_apply_verb_locked(job, JOB_VERB_COMPLETE, errp)) {
   1238         return;
   1239     }
   1240     if (job_cancel_requested_locked(job) || !job->driver->complete) {
   1241         error_setg(errp, "The active block job '%s' cannot be completed",
   1242                    job->id);
   1243         return;
   1244     }
   1245 
   1246     job_unlock();
   1247     job->driver->complete(job, errp);
   1248     job_lock();
   1249 }
   1250 
   1251 int job_finish_sync_locked(Job *job,
   1252                            void (*finish)(Job *, Error **errp),
   1253                            Error **errp)
   1254 {
   1255     Error *local_err = NULL;
   1256     int ret;
   1257     GLOBAL_STATE_CODE();
   1258 
   1259     job_ref_locked(job);
   1260 
   1261     if (finish) {
   1262         finish(job, &local_err);
   1263     }
   1264     if (local_err) {
   1265         error_propagate(errp, local_err);
   1266         job_unref_locked(job);
   1267         return -EBUSY;
   1268     }
   1269 
   1270     job_unlock();
   1271     AIO_WAIT_WHILE_UNLOCKED(job->aio_context,
   1272                             (job_enter(job), !job_is_completed(job)));
   1273     job_lock();
   1274 
   1275     ret = (job_is_cancelled_locked(job) && job->ret == 0)
   1276           ? -ECANCELED : job->ret;
   1277     job_unref_locked(job);
   1278     return ret;
   1279 }