You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
qemu/tests/functional/qemu_test/uncompress.py

108 lines
3.0 KiB
Python

# SPDX-License-Identifier: GPL-2.0-or-later
#
# Utilities for python-based QEMU tests
#
# Copyright 2024 Red Hat, Inc.
#
# Authors:
# Thomas Huth <thuth@redhat.com>
import gzip
import lzma
import os
import stat
import shutil
from urllib.parse import urlparse
from subprocess import check_call, CalledProcessError
from .asset import Asset
def gzip_uncompress(gz_path, output_path):
if os.path.exists(output_path):
return
with gzip.open(gz_path, 'rb') as gz_in:
try:
with open(output_path, 'wb') as raw_out:
shutil.copyfileobj(gz_in, raw_out)
except:
os.remove(output_path)
raise
def lzma_uncompress(xz_path, output_path):
if os.path.exists(output_path):
return
with lzma.open(xz_path, 'rb') as lzma_in:
try:
with open(output_path, 'wb') as raw_out:
shutil.copyfileobj(lzma_in, raw_out)
except:
os.remove(output_path)
raise
def zstd_uncompress(zstd_path, output_path):
if os.path.exists(output_path):
return
try:
check_call(['zstd', "-f", "-d", zstd_path,
"-o", output_path])
except CalledProcessError as e:
os.remove(output_path)
raise Exception(
f"Unable to decompress zstd file {zstd_path} with {e}") from e
# zstd copies source archive permissions for the output
# file, so must make this writable for QEMU
os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR)
'''
@params compressed: filename, Asset, or file-like object to uncompress
@params uncompressed: filename to uncompress into
@params format: optional compression format (gzip, lzma)
Uncompresses @compressed into @uncompressed
If @format is None, heuristics will be applied to guess the format
from the filename or Asset URL. @format must be non-None if @uncompressed
is a file-like object.
Returns the fully qualified path to the uncompessed file
'''
def uncompress(compressed, uncompressed, format=None):
if format is None:
format = guess_uncompress_format(compressed)
if format == "xz":
lzma_uncompress(str(compressed), uncompressed)
elif format == "gz":
gzip_uncompress(str(compressed), uncompressed)
elif format == "zstd":
zstd_uncompress(str(compressed), uncompressed)
else:
raise Exception(f"Unknown compression format {format}")
'''
@params compressed: filename, Asset, or file-like object to guess
Guess the format of @compressed, raising an exception if
no format can be determined
'''
def guess_uncompress_format(compressed):
if type(compressed) == Asset:
compressed = urlparse(compressed.url).path
elif type(compressed) != str:
raise Exception(f"Unable to guess compression cformat for {compressed}")
(name, ext) = os.path.splitext(compressed)
if ext == ".xz":
return "xz"
elif ext == ".gz":
return "gz"
elif ext in [".zstd", ".zst"]:
return 'zstd'
else:
raise Exception(f"Unknown compression format for {compressed}")