qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

task.c (5799B)


      1 /*
      2  * QEMU I/O task
      3  *
      4  * Copyright (c) 2015 Red Hat, Inc.
      5  *
      6  * This library is free software; you can redistribute it and/or
      7  * modify it under the terms of the GNU Lesser General Public
      8  * License as published by the Free Software Foundation; either
      9  * version 2.1 of the License, or (at your option) any later version.
     10  *
     11  * This library is distributed in the hope that it will be useful,
     12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     14  * Lesser General Public License for more details.
     15  *
     16  * You should have received a copy of the GNU Lesser General Public
     17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
     18  *
     19  */
     20 
     21 #include "qemu/osdep.h"
     22 #include "io/task.h"
     23 #include "qapi/error.h"
     24 #include "qemu/thread.h"
     25 #include "qom/object.h"
     26 #include "trace.h"
     27 
     28 struct QIOTaskThreadData {
     29     QIOTaskWorker worker;
     30     gpointer opaque;
     31     GDestroyNotify destroy;
     32     GMainContext *context;
     33     GSource *completion;
     34 };
     35 
     36 
     37 struct QIOTask {
     38     Object *source;
     39     QIOTaskFunc func;
     40     gpointer opaque;
     41     GDestroyNotify destroy;
     42     Error *err;
     43     gpointer result;
     44     GDestroyNotify destroyResult;
     45     QemuMutex thread_lock;
     46     QemuCond thread_cond;
     47     struct QIOTaskThreadData *thread;
     48 };
     49 
     50 
     51 QIOTask *qio_task_new(Object *source,
     52                       QIOTaskFunc func,
     53                       gpointer opaque,
     54                       GDestroyNotify destroy)
     55 {
     56     QIOTask *task;
     57 
     58     task = g_new0(QIOTask, 1);
     59 
     60     task->source = source;
     61     object_ref(source);
     62     task->func = func;
     63     task->opaque = opaque;
     64     task->destroy = destroy;
     65     qemu_mutex_init(&task->thread_lock);
     66     qemu_cond_init(&task->thread_cond);
     67 
     68     trace_qio_task_new(task, source, func, opaque);
     69 
     70     return task;
     71 }
     72 
     73 static void qio_task_free(QIOTask *task)
     74 {
     75     qemu_mutex_lock(&task->thread_lock);
     76     if (task->thread) {
     77         if (task->thread->destroy) {
     78             task->thread->destroy(task->thread->opaque);
     79         }
     80 
     81         if (task->thread->context) {
     82             g_main_context_unref(task->thread->context);
     83         }
     84 
     85         g_free(task->thread);
     86     }
     87 
     88     if (task->destroy) {
     89         task->destroy(task->opaque);
     90     }
     91     if (task->destroyResult) {
     92         task->destroyResult(task->result);
     93     }
     94     if (task->err) {
     95         error_free(task->err);
     96     }
     97     object_unref(task->source);
     98 
     99     qemu_mutex_unlock(&task->thread_lock);
    100     qemu_mutex_destroy(&task->thread_lock);
    101     qemu_cond_destroy(&task->thread_cond);
    102 
    103     g_free(task);
    104 }
    105 
    106 
    107 static gboolean qio_task_thread_result(gpointer opaque)
    108 {
    109     QIOTask *task = opaque;
    110 
    111     trace_qio_task_thread_result(task);
    112     qio_task_complete(task);
    113 
    114     return FALSE;
    115 }
    116 
    117 
    118 static gpointer qio_task_thread_worker(gpointer opaque)
    119 {
    120     QIOTask *task = opaque;
    121 
    122     trace_qio_task_thread_run(task);
    123 
    124     task->thread->worker(task, task->thread->opaque);
    125 
    126     /* We're running in the background thread, and must only
    127      * ever report the task results in the main event loop
    128      * thread. So we schedule an idle callback to report
    129      * the worker results
    130      */
    131     trace_qio_task_thread_exit(task);
    132 
    133     qemu_mutex_lock(&task->thread_lock);
    134 
    135     task->thread->completion = g_idle_source_new();
    136     g_source_set_callback(task->thread->completion,
    137                           qio_task_thread_result, task, NULL);
    138     g_source_attach(task->thread->completion,
    139                     task->thread->context);
    140     g_source_unref(task->thread->completion);
    141     trace_qio_task_thread_source_attach(task, task->thread->completion);
    142 
    143     qemu_cond_signal(&task->thread_cond);
    144     qemu_mutex_unlock(&task->thread_lock);
    145 
    146     return NULL;
    147 }
    148 
    149 
    150 void qio_task_run_in_thread(QIOTask *task,
    151                             QIOTaskWorker worker,
    152                             gpointer opaque,
    153                             GDestroyNotify destroy,
    154                             GMainContext *context)
    155 {
    156     struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1);
    157     QemuThread thread;
    158 
    159     if (context) {
    160         g_main_context_ref(context);
    161     }
    162 
    163     data->worker = worker;
    164     data->opaque = opaque;
    165     data->destroy = destroy;
    166     data->context = context;
    167 
    168     task->thread = data;
    169 
    170     trace_qio_task_thread_start(task, worker, opaque);
    171     qemu_thread_create(&thread,
    172                        "io-task-worker",
    173                        qio_task_thread_worker,
    174                        task,
    175                        QEMU_THREAD_DETACHED);
    176 }
    177 
    178 
    179 void qio_task_wait_thread(QIOTask *task)
    180 {
    181     qemu_mutex_lock(&task->thread_lock);
    182     g_assert(task->thread != NULL);
    183     while (task->thread->completion == NULL) {
    184         qemu_cond_wait(&task->thread_cond, &task->thread_lock);
    185     }
    186 
    187     trace_qio_task_thread_source_cancel(task, task->thread->completion);
    188     g_source_destroy(task->thread->completion);
    189     qemu_mutex_unlock(&task->thread_lock);
    190 
    191     qio_task_thread_result(task);
    192 }
    193 
    194 
    195 void qio_task_complete(QIOTask *task)
    196 {
    197     task->func(task, task->opaque);
    198     trace_qio_task_complete(task);
    199     qio_task_free(task);
    200 }
    201 
    202 
    203 void qio_task_set_error(QIOTask *task,
    204                         Error *err)
    205 {
    206     error_propagate(&task->err, err);
    207 }
    208 
    209 
    210 bool qio_task_propagate_error(QIOTask *task,
    211                               Error **errp)
    212 {
    213     if (task->err) {
    214         error_propagate(errp, task->err);
    215         task->err = NULL;
    216         return true;
    217     }
    218 
    219     return false;
    220 }
    221 
    222 
    223 void qio_task_set_result_pointer(QIOTask *task,
    224                                  gpointer result,
    225                                  GDestroyNotify destroy)
    226 {
    227     task->result = result;
    228     task->destroyResult = destroy;
    229 }
    230 
    231 
    232 gpointer qio_task_get_result_pointer(QIOTask *task)
    233 {
    234     return task->result;
    235 }
    236 
    237 
    238 Object *qio_task_get_source(QIOTask *task)
    239 {
    240     return task->source;
    241 }