qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

engine.py (17443B)


      1 #
      2 # Migration test main engine
      3 #
      4 # Copyright (c) 2016 Red Hat, Inc.
      5 #
      6 # This library is free software; you can redistribute it and/or
      7 # modify it under the terms of the GNU Lesser General Public
      8 # License as published by the Free Software Foundation; either
      9 # version 2.1 of the License, or (at your option) any later version.
     10 #
     11 # This library is distributed in the hope that it will be useful,
     12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     14 # Lesser General Public License for more details.
     15 #
     16 # You should have received a copy of the GNU Lesser General Public
     17 # License along with this library; if not, see <http://www.gnu.org/licenses/>.
     18 #
     19 
     20 
     21 import os
     22 import re
     23 import sys
     24 import time
     25 
     26 from guestperf.progress import Progress, ProgressStats
     27 from guestperf.report import Report
     28 from guestperf.timings import TimingRecord, Timings
     29 
     30 sys.path.append(os.path.join(os.path.dirname(__file__),
     31                              '..', '..', '..', 'python'))
     32 from qemu.machine import QEMUMachine
     33 
     34 
     35 class Engine(object):
     36 
     37     def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
     38                  sleep=15, verbose=False, debug=False):
     39 
     40         self._binary = binary # Path to QEMU binary
     41         self._dst_host = dst_host # Hostname of target host
     42         self._kernel = kernel # Path to kernel image
     43         self._initrd = initrd # Path to stress initrd
     44         self._transport = transport # 'unix' or 'tcp' or 'rdma'
     45         self._sleep = sleep
     46         self._verbose = verbose
     47         self._debug = debug
     48 
     49         if debug:
     50             self._verbose = debug
     51 
     52     def _vcpu_timing(self, pid, tid_list):
     53         records = []
     54         now = time.time()
     55 
     56         jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
     57         for tid in tid_list:
     58             statfile = "/proc/%d/task/%d/stat" % (pid, tid)
     59             with open(statfile, "r") as fh:
     60                 stat = fh.readline()
     61                 fields = stat.split(" ")
     62                 stime = int(fields[13])
     63                 utime = int(fields[14])
     64                 records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
     65         return records
     66 
     67     def _cpu_timing(self, pid):
     68         now = time.time()
     69 
     70         jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
     71         statfile = "/proc/%d/stat" % pid
     72         with open(statfile, "r") as fh:
     73             stat = fh.readline()
     74             fields = stat.split(" ")
     75             stime = int(fields[13])
     76             utime = int(fields[14])
     77             return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
     78 
     79     def _migrate_progress(self, vm):
     80         info = vm.command("query-migrate")
     81 
     82         if "ram" not in info:
     83             info["ram"] = {}
     84 
     85         return Progress(
     86             info.get("status", "active"),
     87             ProgressStats(
     88                 info["ram"].get("transferred", 0),
     89                 info["ram"].get("remaining", 0),
     90                 info["ram"].get("total", 0),
     91                 info["ram"].get("duplicate", 0),
     92                 info["ram"].get("skipped", 0),
     93                 info["ram"].get("normal", 0),
     94                 info["ram"].get("normal-bytes", 0),
     95                 info["ram"].get("dirty-pages-rate", 0),
     96                 info["ram"].get("mbps", 0),
     97                 info["ram"].get("dirty-sync-count", 0)
     98             ),
     99             time.time(),
    100             info.get("total-time", 0),
    101             info.get("downtime", 0),
    102             info.get("expected-downtime", 0),
    103             info.get("setup-time", 0),
    104             info.get("cpu-throttle-percentage", 0),
    105         )
    106 
    107     def _migrate(self, hardware, scenario, src, dst, connect_uri):
    108         src_qemu_time = []
    109         src_vcpu_time = []
    110         src_pid = src.get_pid()
    111 
    112         vcpus = src.command("query-cpus-fast")
    113         src_threads = []
    114         for vcpu in vcpus:
    115             src_threads.append(vcpu["thread-id"])
    116 
    117         # XXX how to get dst timings on remote host ?
    118 
    119         if self._verbose:
    120             print("Sleeping %d seconds for initial guest workload run" % self._sleep)
    121         sleep_secs = self._sleep
    122         while sleep_secs > 1:
    123             src_qemu_time.append(self._cpu_timing(src_pid))
    124             src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
    125             time.sleep(1)
    126             sleep_secs -= 1
    127 
    128         if self._verbose:
    129             print("Starting migration")
    130         if scenario._auto_converge:
    131             resp = src.command("migrate-set-capabilities",
    132                                capabilities = [
    133                                    { "capability": "auto-converge",
    134                                      "state": True }
    135                                ])
    136             resp = src.command("migrate-set-parameters",
    137                                cpu_throttle_increment=scenario._auto_converge_step)
    138 
    139         if scenario._post_copy:
    140             resp = src.command("migrate-set-capabilities",
    141                                capabilities = [
    142                                    { "capability": "postcopy-ram",
    143                                      "state": True }
    144                                ])
    145             resp = dst.command("migrate-set-capabilities",
    146                                capabilities = [
    147                                    { "capability": "postcopy-ram",
    148                                      "state": True }
    149                                ])
    150 
    151         resp = src.command("migrate-set-parameters",
    152                            max_bandwidth=scenario._bandwidth * 1024 * 1024)
    153 
    154         resp = src.command("migrate-set-parameters",
    155                            downtime_limit=scenario._downtime)
    156 
    157         if scenario._compression_mt:
    158             resp = src.command("migrate-set-capabilities",
    159                                capabilities = [
    160                                    { "capability": "compress",
    161                                      "state": True }
    162                                ])
    163             resp = src.command("migrate-set-parameters",
    164                                compress_threads=scenario._compression_mt_threads)
    165             resp = dst.command("migrate-set-capabilities",
    166                                capabilities = [
    167                                    { "capability": "compress",
    168                                      "state": True }
    169                                ])
    170             resp = dst.command("migrate-set-parameters",
    171                                decompress_threads=scenario._compression_mt_threads)
    172 
    173         if scenario._compression_xbzrle:
    174             resp = src.command("migrate-set-capabilities",
    175                                capabilities = [
    176                                    { "capability": "xbzrle",
    177                                      "state": True }
    178                                ])
    179             resp = dst.command("migrate-set-capabilities",
    180                                capabilities = [
    181                                    { "capability": "xbzrle",
    182                                      "state": True }
    183                                ])
    184             resp = src.command("migrate-set-parameters",
    185                                xbzrle_cache_size=(
    186                                    hardware._mem *
    187                                    1024 * 1024 * 1024 / 100 *
    188                                    scenario._compression_xbzrle_cache))
    189 
    190         if scenario._multifd:
    191             resp = src.command("migrate-set-capabilities",
    192                                capabilities = [
    193                                    { "capability": "multifd",
    194                                      "state": True }
    195                                ])
    196             resp = src.command("migrate-set-parameters",
    197                                multifd_channels=scenario._multifd_channels)
    198             resp = dst.command("migrate-set-capabilities",
    199                                capabilities = [
    200                                    { "capability": "multifd",
    201                                      "state": True }
    202                                ])
    203             resp = dst.command("migrate-set-parameters",
    204                                multifd_channels=scenario._multifd_channels)
    205 
    206         resp = src.command("migrate", uri=connect_uri)
    207 
    208         post_copy = False
    209         paused = False
    210 
    211         progress_history = []
    212 
    213         start = time.time()
    214         loop = 0
    215         while True:
    216             loop = loop + 1
    217             time.sleep(0.05)
    218 
    219             progress = self._migrate_progress(src)
    220             if (loop % 20) == 0:
    221                 src_qemu_time.append(self._cpu_timing(src_pid))
    222                 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
    223 
    224             if (len(progress_history) == 0 or
    225                 (progress_history[-1]._ram._iterations <
    226                  progress._ram._iterations)):
    227                 progress_history.append(progress)
    228 
    229             if progress._status in ("completed", "failed", "cancelled"):
    230                 if progress._status == "completed" and paused:
    231                     dst.command("cont")
    232                 if progress_history[-1] != progress:
    233                     progress_history.append(progress)
    234 
    235                 if progress._status == "completed":
    236                     if self._verbose:
    237                         print("Sleeping %d seconds for final guest workload run" % self._sleep)
    238                     sleep_secs = self._sleep
    239                     while sleep_secs > 1:
    240                         time.sleep(1)
    241                         src_qemu_time.append(self._cpu_timing(src_pid))
    242                         src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
    243                         sleep_secs -= 1
    244 
    245                 return [progress_history, src_qemu_time, src_vcpu_time]
    246 
    247             if self._verbose and (loop % 20) == 0:
    248                 print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
    249                     progress._ram._iterations,
    250                     progress._ram._remaining_bytes / (1024 * 1024),
    251                     progress._ram._total_bytes / (1024 * 1024),
    252                     progress._ram._transferred_bytes / (1024 * 1024),
    253                     progress._ram._transfer_rate_mbs,
    254                 ))
    255 
    256             if progress._ram._iterations > scenario._max_iters:
    257                 if self._verbose:
    258                     print("No completion after %d iterations over RAM" % scenario._max_iters)
    259                 src.command("migrate_cancel")
    260                 continue
    261 
    262             if time.time() > (start + scenario._max_time):
    263                 if self._verbose:
    264                     print("No completion after %d seconds" % scenario._max_time)
    265                 src.command("migrate_cancel")
    266                 continue
    267 
    268             if (scenario._post_copy and
    269                 progress._ram._iterations >= scenario._post_copy_iters and
    270                 not post_copy):
    271                 if self._verbose:
    272                     print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
    273                 resp = src.command("migrate-start-postcopy")
    274                 post_copy = True
    275 
    276             if (scenario._pause and
    277                 progress._ram._iterations >= scenario._pause_iters and
    278                 not paused):
    279                 if self._verbose:
    280                     print("Pausing VM after %d iterations" % scenario._pause_iters)
    281                 resp = src.command("stop")
    282                 paused = True
    283 
    284     def _get_common_args(self, hardware, tunnelled=False):
    285         args = [
    286             "noapic",
    287             "edd=off",
    288             "printk.time=1",
    289             "noreplace-smp",
    290             "cgroup_disable=memory",
    291             "pci=noearly",
    292             "console=ttyS0",
    293         ]
    294         if self._debug:
    295             args.append("debug")
    296         else:
    297             args.append("quiet")
    298 
    299         args.append("ramsize=%s" % hardware._mem)
    300 
    301         cmdline = " ".join(args)
    302         if tunnelled:
    303             cmdline = "'" + cmdline + "'"
    304 
    305         argv = [
    306             "-accel", "kvm",
    307             "-cpu", "host",
    308             "-kernel", self._kernel,
    309             "-initrd", self._initrd,
    310             "-append", cmdline,
    311             "-chardev", "stdio,id=cdev0",
    312             "-device", "isa-serial,chardev=cdev0",
    313             "-m", str((hardware._mem * 1024) + 512),
    314             "-smp", str(hardware._cpus),
    315         ]
    316 
    317         if self._debug:
    318             argv.extend(["-device", "sga"])
    319 
    320         if hardware._prealloc_pages:
    321             argv_source += ["-mem-path", "/dev/shm",
    322                             "-mem-prealloc"]
    323         if hardware._locked_pages:
    324             argv_source += ["-overcommit", "mem-lock=on"]
    325         if hardware._huge_pages:
    326             pass
    327 
    328         return argv
    329 
    330     def _get_src_args(self, hardware):
    331         return self._get_common_args(hardware)
    332 
    333     def _get_dst_args(self, hardware, uri):
    334         tunnelled = False
    335         if self._dst_host != "localhost":
    336             tunnelled = True
    337         argv = self._get_common_args(hardware, tunnelled)
    338         return argv + ["-incoming", uri]
    339 
    340     @staticmethod
    341     def _get_common_wrapper(cpu_bind, mem_bind):
    342         wrapper = []
    343         if len(cpu_bind) > 0 or len(mem_bind) > 0:
    344             wrapper.append("numactl")
    345             if cpu_bind:
    346                 wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
    347             if mem_bind:
    348                 wrapper.append("--membind=%s" % ",".join(mem_bind))
    349 
    350         return wrapper
    351 
    352     def _get_src_wrapper(self, hardware):
    353         return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
    354 
    355     def _get_dst_wrapper(self, hardware):
    356         wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
    357         if self._dst_host != "localhost":
    358             return ["ssh",
    359                     "-R", "9001:localhost:9001",
    360                     self._dst_host] + wrapper
    361         else:
    362             return wrapper
    363 
    364     def _get_timings(self, vm):
    365         log = vm.get_log()
    366         if not log:
    367             return []
    368         if self._debug:
    369             print(log)
    370 
    371         regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
    372         matcher = re.compile(regex)
    373         records = []
    374         for line in log.split("\n"):
    375             match = matcher.match(line)
    376             if match:
    377                 records.append(TimingRecord(int(match.group(1)),
    378                                             int(match.group(2)) / 1000.0,
    379                                             int(match.group(3))))
    380         return records
    381 
    382     def run(self, hardware, scenario, result_dir=os.getcwd()):
    383         abs_result_dir = os.path.join(result_dir, scenario._name)
    384 
    385         if self._transport == "tcp":
    386             uri = "tcp:%s:9000" % self._dst_host
    387         elif self._transport == "rdma":
    388             uri = "rdma:%s:9000" % self._dst_host
    389         elif self._transport == "unix":
    390             if self._dst_host != "localhost":
    391                 raise Exception("Running use unix migration transport for non-local host")
    392             uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
    393             try:
    394                 os.remove(uri[5:])
    395                 os.remove(monaddr)
    396             except:
    397                 pass
    398 
    399         if self._dst_host != "localhost":
    400             dstmonaddr = ("localhost", 9001)
    401         else:
    402             dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
    403         srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
    404 
    405         src = QEMUMachine(self._binary,
    406                           args=self._get_src_args(hardware),
    407                           wrapper=self._get_src_wrapper(hardware),
    408                           name="qemu-src-%d" % os.getpid(),
    409                           monitor_address=srcmonaddr)
    410 
    411         dst = QEMUMachine(self._binary,
    412                           args=self._get_dst_args(hardware, uri),
    413                           wrapper=self._get_dst_wrapper(hardware),
    414                           name="qemu-dst-%d" % os.getpid(),
    415                           monitor_address=dstmonaddr)
    416 
    417         try:
    418             src.launch()
    419             dst.launch()
    420 
    421             ret = self._migrate(hardware, scenario, src, dst, uri)
    422             progress_history = ret[0]
    423             qemu_timings = ret[1]
    424             vcpu_timings = ret[2]
    425             if uri[0:5] == "unix:" and os.path.exists(uri[5:]):
    426                 os.remove(uri[5:])
    427 
    428             if os.path.exists(srcmonaddr):
    429                 os.remove(srcmonaddr)
    430 
    431             if self._dst_host == "localhost" and os.path.exists(dstmonaddr):
    432                 os.remove(dstmonaddr)
    433 
    434             if self._verbose:
    435                 print("Finished migration")
    436 
    437             src.shutdown()
    438             dst.shutdown()
    439 
    440             return Report(hardware, scenario, progress_history,
    441                           Timings(self._get_timings(src) + self._get_timings(dst)),
    442                           Timings(qemu_timings),
    443                           Timings(vcpu_timings),
    444                           self._binary, self._dst_host, self._kernel,
    445                           self._initrd, self._transport, self._sleep)
    446         except Exception as e:
    447             if self._debug:
    448                 print("Failed: %s" % str(e))
    449             try:
    450                 src.shutdown()
    451             except:
    452                 pass
    453             try:
    454                 dst.shutdown()
    455             except:
    456                 pass
    457 
    458             if self._debug:
    459                 print(src.get_log())
    460                 print(dst.get_log())
    461             raise
    462