engine.py (17443B)
1 # 2 # Migration test main engine 3 # 4 # Copyright (c) 2016 Red Hat, Inc. 5 # 6 # This library is free software; you can redistribute it and/or 7 # modify it under the terms of the GNU Lesser General Public 8 # License as published by the Free Software Foundation; either 9 # version 2.1 of the License, or (at your option) any later version. 10 # 11 # This library is distributed in the hope that it will be useful, 12 # but WITHOUT ANY WARRANTY; without even the implied warranty of 13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 # Lesser General Public License for more details. 15 # 16 # You should have received a copy of the GNU Lesser General Public 17 # License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 # 19 20 21 import os 22 import re 23 import sys 24 import time 25 26 from guestperf.progress import Progress, ProgressStats 27 from guestperf.report import Report 28 from guestperf.timings import TimingRecord, Timings 29 30 sys.path.append(os.path.join(os.path.dirname(__file__), 31 '..', '..', '..', 'python')) 32 from qemu.machine import QEMUMachine 33 34 35 class Engine(object): 36 37 def __init__(self, binary, dst_host, kernel, initrd, transport="tcp", 38 sleep=15, verbose=False, debug=False): 39 40 self._binary = binary # Path to QEMU binary 41 self._dst_host = dst_host # Hostname of target host 42 self._kernel = kernel # Path to kernel image 43 self._initrd = initrd # Path to stress initrd 44 self._transport = transport # 'unix' or 'tcp' or 'rdma' 45 self._sleep = sleep 46 self._verbose = verbose 47 self._debug = debug 48 49 if debug: 50 self._verbose = debug 51 52 def _vcpu_timing(self, pid, tid_list): 53 records = [] 54 now = time.time() 55 56 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) 57 for tid in tid_list: 58 statfile = "/proc/%d/task/%d/stat" % (pid, tid) 59 with open(statfile, "r") as fh: 60 stat = fh.readline() 61 fields = stat.split(" ") 62 stime = int(fields[13]) 63 utime = int(fields[14]) 64 records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec)) 65 return records 66 67 def _cpu_timing(self, pid): 68 now = time.time() 69 70 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) 71 statfile = "/proc/%d/stat" % pid 72 with open(statfile, "r") as fh: 73 stat = fh.readline() 74 fields = stat.split(" ") 75 stime = int(fields[13]) 76 utime = int(fields[14]) 77 return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec) 78 79 def _migrate_progress(self, vm): 80 info = vm.command("query-migrate") 81 82 if "ram" not in info: 83 info["ram"] = {} 84 85 return Progress( 86 info.get("status", "active"), 87 ProgressStats( 88 info["ram"].get("transferred", 0), 89 info["ram"].get("remaining", 0), 90 info["ram"].get("total", 0), 91 info["ram"].get("duplicate", 0), 92 info["ram"].get("skipped", 0), 93 info["ram"].get("normal", 0), 94 info["ram"].get("normal-bytes", 0), 95 info["ram"].get("dirty-pages-rate", 0), 96 info["ram"].get("mbps", 0), 97 info["ram"].get("dirty-sync-count", 0) 98 ), 99 time.time(), 100 info.get("total-time", 0), 101 info.get("downtime", 0), 102 info.get("expected-downtime", 0), 103 info.get("setup-time", 0), 104 info.get("cpu-throttle-percentage", 0), 105 ) 106 107 def _migrate(self, hardware, scenario, src, dst, connect_uri): 108 src_qemu_time = [] 109 src_vcpu_time = [] 110 src_pid = src.get_pid() 111 112 vcpus = src.command("query-cpus-fast") 113 src_threads = [] 114 for vcpu in vcpus: 115 src_threads.append(vcpu["thread-id"]) 116 117 # XXX how to get dst timings on remote host ? 118 119 if self._verbose: 120 print("Sleeping %d seconds for initial guest workload run" % self._sleep) 121 sleep_secs = self._sleep 122 while sleep_secs > 1: 123 src_qemu_time.append(self._cpu_timing(src_pid)) 124 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 125 time.sleep(1) 126 sleep_secs -= 1 127 128 if self._verbose: 129 print("Starting migration") 130 if scenario._auto_converge: 131 resp = src.command("migrate-set-capabilities", 132 capabilities = [ 133 { "capability": "auto-converge", 134 "state": True } 135 ]) 136 resp = src.command("migrate-set-parameters", 137 cpu_throttle_increment=scenario._auto_converge_step) 138 139 if scenario._post_copy: 140 resp = src.command("migrate-set-capabilities", 141 capabilities = [ 142 { "capability": "postcopy-ram", 143 "state": True } 144 ]) 145 resp = dst.command("migrate-set-capabilities", 146 capabilities = [ 147 { "capability": "postcopy-ram", 148 "state": True } 149 ]) 150 151 resp = src.command("migrate-set-parameters", 152 max_bandwidth=scenario._bandwidth * 1024 * 1024) 153 154 resp = src.command("migrate-set-parameters", 155 downtime_limit=scenario._downtime) 156 157 if scenario._compression_mt: 158 resp = src.command("migrate-set-capabilities", 159 capabilities = [ 160 { "capability": "compress", 161 "state": True } 162 ]) 163 resp = src.command("migrate-set-parameters", 164 compress_threads=scenario._compression_mt_threads) 165 resp = dst.command("migrate-set-capabilities", 166 capabilities = [ 167 { "capability": "compress", 168 "state": True } 169 ]) 170 resp = dst.command("migrate-set-parameters", 171 decompress_threads=scenario._compression_mt_threads) 172 173 if scenario._compression_xbzrle: 174 resp = src.command("migrate-set-capabilities", 175 capabilities = [ 176 { "capability": "xbzrle", 177 "state": True } 178 ]) 179 resp = dst.command("migrate-set-capabilities", 180 capabilities = [ 181 { "capability": "xbzrle", 182 "state": True } 183 ]) 184 resp = src.command("migrate-set-parameters", 185 xbzrle_cache_size=( 186 hardware._mem * 187 1024 * 1024 * 1024 / 100 * 188 scenario._compression_xbzrle_cache)) 189 190 if scenario._multifd: 191 resp = src.command("migrate-set-capabilities", 192 capabilities = [ 193 { "capability": "multifd", 194 "state": True } 195 ]) 196 resp = src.command("migrate-set-parameters", 197 multifd_channels=scenario._multifd_channels) 198 resp = dst.command("migrate-set-capabilities", 199 capabilities = [ 200 { "capability": "multifd", 201 "state": True } 202 ]) 203 resp = dst.command("migrate-set-parameters", 204 multifd_channels=scenario._multifd_channels) 205 206 resp = src.command("migrate", uri=connect_uri) 207 208 post_copy = False 209 paused = False 210 211 progress_history = [] 212 213 start = time.time() 214 loop = 0 215 while True: 216 loop = loop + 1 217 time.sleep(0.05) 218 219 progress = self._migrate_progress(src) 220 if (loop % 20) == 0: 221 src_qemu_time.append(self._cpu_timing(src_pid)) 222 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 223 224 if (len(progress_history) == 0 or 225 (progress_history[-1]._ram._iterations < 226 progress._ram._iterations)): 227 progress_history.append(progress) 228 229 if progress._status in ("completed", "failed", "cancelled"): 230 if progress._status == "completed" and paused: 231 dst.command("cont") 232 if progress_history[-1] != progress: 233 progress_history.append(progress) 234 235 if progress._status == "completed": 236 if self._verbose: 237 print("Sleeping %d seconds for final guest workload run" % self._sleep) 238 sleep_secs = self._sleep 239 while sleep_secs > 1: 240 time.sleep(1) 241 src_qemu_time.append(self._cpu_timing(src_pid)) 242 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 243 sleep_secs -= 1 244 245 return [progress_history, src_qemu_time, src_vcpu_time] 246 247 if self._verbose and (loop % 20) == 0: 248 print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % ( 249 progress._ram._iterations, 250 progress._ram._remaining_bytes / (1024 * 1024), 251 progress._ram._total_bytes / (1024 * 1024), 252 progress._ram._transferred_bytes / (1024 * 1024), 253 progress._ram._transfer_rate_mbs, 254 )) 255 256 if progress._ram._iterations > scenario._max_iters: 257 if self._verbose: 258 print("No completion after %d iterations over RAM" % scenario._max_iters) 259 src.command("migrate_cancel") 260 continue 261 262 if time.time() > (start + scenario._max_time): 263 if self._verbose: 264 print("No completion after %d seconds" % scenario._max_time) 265 src.command("migrate_cancel") 266 continue 267 268 if (scenario._post_copy and 269 progress._ram._iterations >= scenario._post_copy_iters and 270 not post_copy): 271 if self._verbose: 272 print("Switching to post-copy after %d iterations" % scenario._post_copy_iters) 273 resp = src.command("migrate-start-postcopy") 274 post_copy = True 275 276 if (scenario._pause and 277 progress._ram._iterations >= scenario._pause_iters and 278 not paused): 279 if self._verbose: 280 print("Pausing VM after %d iterations" % scenario._pause_iters) 281 resp = src.command("stop") 282 paused = True 283 284 def _get_common_args(self, hardware, tunnelled=False): 285 args = [ 286 "noapic", 287 "edd=off", 288 "printk.time=1", 289 "noreplace-smp", 290 "cgroup_disable=memory", 291 "pci=noearly", 292 "console=ttyS0", 293 ] 294 if self._debug: 295 args.append("debug") 296 else: 297 args.append("quiet") 298 299 args.append("ramsize=%s" % hardware._mem) 300 301 cmdline = " ".join(args) 302 if tunnelled: 303 cmdline = "'" + cmdline + "'" 304 305 argv = [ 306 "-accel", "kvm", 307 "-cpu", "host", 308 "-kernel", self._kernel, 309 "-initrd", self._initrd, 310 "-append", cmdline, 311 "-chardev", "stdio,id=cdev0", 312 "-device", "isa-serial,chardev=cdev0", 313 "-m", str((hardware._mem * 1024) + 512), 314 "-smp", str(hardware._cpus), 315 ] 316 317 if self._debug: 318 argv.extend(["-device", "sga"]) 319 320 if hardware._prealloc_pages: 321 argv_source += ["-mem-path", "/dev/shm", 322 "-mem-prealloc"] 323 if hardware._locked_pages: 324 argv_source += ["-overcommit", "mem-lock=on"] 325 if hardware._huge_pages: 326 pass 327 328 return argv 329 330 def _get_src_args(self, hardware): 331 return self._get_common_args(hardware) 332 333 def _get_dst_args(self, hardware, uri): 334 tunnelled = False 335 if self._dst_host != "localhost": 336 tunnelled = True 337 argv = self._get_common_args(hardware, tunnelled) 338 return argv + ["-incoming", uri] 339 340 @staticmethod 341 def _get_common_wrapper(cpu_bind, mem_bind): 342 wrapper = [] 343 if len(cpu_bind) > 0 or len(mem_bind) > 0: 344 wrapper.append("numactl") 345 if cpu_bind: 346 wrapper.append("--physcpubind=%s" % ",".join(cpu_bind)) 347 if mem_bind: 348 wrapper.append("--membind=%s" % ",".join(mem_bind)) 349 350 return wrapper 351 352 def _get_src_wrapper(self, hardware): 353 return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind) 354 355 def _get_dst_wrapper(self, hardware): 356 wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind) 357 if self._dst_host != "localhost": 358 return ["ssh", 359 "-R", "9001:localhost:9001", 360 self._dst_host] + wrapper 361 else: 362 return wrapper 363 364 def _get_timings(self, vm): 365 log = vm.get_log() 366 if not log: 367 return [] 368 if self._debug: 369 print(log) 370 371 regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms" 372 matcher = re.compile(regex) 373 records = [] 374 for line in log.split("\n"): 375 match = matcher.match(line) 376 if match: 377 records.append(TimingRecord(int(match.group(1)), 378 int(match.group(2)) / 1000.0, 379 int(match.group(3)))) 380 return records 381 382 def run(self, hardware, scenario, result_dir=os.getcwd()): 383 abs_result_dir = os.path.join(result_dir, scenario._name) 384 385 if self._transport == "tcp": 386 uri = "tcp:%s:9000" % self._dst_host 387 elif self._transport == "rdma": 388 uri = "rdma:%s:9000" % self._dst_host 389 elif self._transport == "unix": 390 if self._dst_host != "localhost": 391 raise Exception("Running use unix migration transport for non-local host") 392 uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid() 393 try: 394 os.remove(uri[5:]) 395 os.remove(monaddr) 396 except: 397 pass 398 399 if self._dst_host != "localhost": 400 dstmonaddr = ("localhost", 9001) 401 else: 402 dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid() 403 srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid() 404 405 src = QEMUMachine(self._binary, 406 args=self._get_src_args(hardware), 407 wrapper=self._get_src_wrapper(hardware), 408 name="qemu-src-%d" % os.getpid(), 409 monitor_address=srcmonaddr) 410 411 dst = QEMUMachine(self._binary, 412 args=self._get_dst_args(hardware, uri), 413 wrapper=self._get_dst_wrapper(hardware), 414 name="qemu-dst-%d" % os.getpid(), 415 monitor_address=dstmonaddr) 416 417 try: 418 src.launch() 419 dst.launch() 420 421 ret = self._migrate(hardware, scenario, src, dst, uri) 422 progress_history = ret[0] 423 qemu_timings = ret[1] 424 vcpu_timings = ret[2] 425 if uri[0:5] == "unix:" and os.path.exists(uri[5:]): 426 os.remove(uri[5:]) 427 428 if os.path.exists(srcmonaddr): 429 os.remove(srcmonaddr) 430 431 if self._dst_host == "localhost" and os.path.exists(dstmonaddr): 432 os.remove(dstmonaddr) 433 434 if self._verbose: 435 print("Finished migration") 436 437 src.shutdown() 438 dst.shutdown() 439 440 return Report(hardware, scenario, progress_history, 441 Timings(self._get_timings(src) + self._get_timings(dst)), 442 Timings(qemu_timings), 443 Timings(vcpu_timings), 444 self._binary, self._dst_host, self._kernel, 445 self._initrd, self._transport, self._sleep) 446 except Exception as e: 447 if self._debug: 448 print("Failed: %s" % str(e)) 449 try: 450 src.shutdown() 451 except: 452 pass 453 try: 454 dst.shutdown() 455 except: 456 pass 457 458 if self._debug: 459 print(src.get_log()) 460 print(dst.get_log()) 461 raise 462