mirror of https://gitlab.com/qemu-project/qemu
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
108 lines
3.0 KiB
Python
108 lines
3.0 KiB
Python
# SPDX-License-Identifier: GPL-2.0-or-later
|
|
#
|
|
# Utilities for python-based QEMU tests
|
|
#
|
|
# Copyright 2024 Red Hat, Inc.
|
|
#
|
|
# Authors:
|
|
# Thomas Huth <thuth@redhat.com>
|
|
|
|
import gzip
|
|
import lzma
|
|
import os
|
|
import stat
|
|
import shutil
|
|
from urllib.parse import urlparse
|
|
from subprocess import check_call, CalledProcessError
|
|
|
|
from .asset import Asset
|
|
|
|
|
|
def gzip_uncompress(gz_path, output_path):
|
|
if os.path.exists(output_path):
|
|
return
|
|
with gzip.open(gz_path, 'rb') as gz_in:
|
|
try:
|
|
with open(output_path, 'wb') as raw_out:
|
|
shutil.copyfileobj(gz_in, raw_out)
|
|
except:
|
|
os.remove(output_path)
|
|
raise
|
|
|
|
def lzma_uncompress(xz_path, output_path):
|
|
if os.path.exists(output_path):
|
|
return
|
|
with lzma.open(xz_path, 'rb') as lzma_in:
|
|
try:
|
|
with open(output_path, 'wb') as raw_out:
|
|
shutil.copyfileobj(lzma_in, raw_out)
|
|
except:
|
|
os.remove(output_path)
|
|
raise
|
|
|
|
|
|
def zstd_uncompress(zstd_path, output_path):
|
|
if os.path.exists(output_path):
|
|
return
|
|
|
|
try:
|
|
check_call(['zstd', "-f", "-d", zstd_path,
|
|
"-o", output_path])
|
|
except CalledProcessError as e:
|
|
os.remove(output_path)
|
|
raise Exception(
|
|
f"Unable to decompress zstd file {zstd_path} with {e}") from e
|
|
|
|
# zstd copies source archive permissions for the output
|
|
# file, so must make this writable for QEMU
|
|
os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR)
|
|
|
|
|
|
'''
|
|
@params compressed: filename, Asset, or file-like object to uncompress
|
|
@params uncompressed: filename to uncompress into
|
|
@params format: optional compression format (gzip, lzma)
|
|
|
|
Uncompresses @compressed into @uncompressed
|
|
|
|
If @format is None, heuristics will be applied to guess the format
|
|
from the filename or Asset URL. @format must be non-None if @uncompressed
|
|
is a file-like object.
|
|
|
|
Returns the fully qualified path to the uncompessed file
|
|
'''
|
|
def uncompress(compressed, uncompressed, format=None):
|
|
if format is None:
|
|
format = guess_uncompress_format(compressed)
|
|
|
|
if format == "xz":
|
|
lzma_uncompress(str(compressed), uncompressed)
|
|
elif format == "gz":
|
|
gzip_uncompress(str(compressed), uncompressed)
|
|
elif format == "zstd":
|
|
zstd_uncompress(str(compressed), uncompressed)
|
|
else:
|
|
raise Exception(f"Unknown compression format {format}")
|
|
|
|
'''
|
|
@params compressed: filename, Asset, or file-like object to guess
|
|
|
|
Guess the format of @compressed, raising an exception if
|
|
no format can be determined
|
|
'''
|
|
def guess_uncompress_format(compressed):
|
|
if type(compressed) == Asset:
|
|
compressed = urlparse(compressed.url).path
|
|
elif type(compressed) != str:
|
|
raise Exception(f"Unable to guess compression cformat for {compressed}")
|
|
|
|
(name, ext) = os.path.splitext(compressed)
|
|
if ext == ".xz":
|
|
return "xz"
|
|
elif ext == ".gz":
|
|
return "gz"
|
|
elif ext in [".zstd", ".zst"]:
|
|
return 'zstd'
|
|
else:
|
|
raise Exception(f"Unknown compression format for {compressed}")
|