mirror of https://gitlab.com/qemu-project/qemu
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
118 lines
3.7 KiB
Python
118 lines
3.7 KiB
Python
# SPDX-License-Identifier: GPL-2.0-or-later
|
|
#
|
|
# Utilities for python-based QEMU tests
|
|
#
|
|
# Copyright 2024 Red Hat, Inc.
|
|
#
|
|
# Authors:
|
|
# Thomas Huth <thuth@redhat.com>
|
|
|
|
import os
|
|
from subprocess import check_call, run, DEVNULL
|
|
import tarfile
|
|
from urllib.parse import urlparse
|
|
import zipfile
|
|
|
|
from .asset import Asset
|
|
|
|
|
|
def tar_extract(archive, dest_dir, member=None):
|
|
with tarfile.open(archive) as tf:
|
|
if hasattr(tarfile, 'data_filter'):
|
|
tf.extraction_filter = getattr(tarfile, 'data_filter',
|
|
(lambda member, path: member))
|
|
if member:
|
|
tf.extract(member=member, path=dest_dir)
|
|
else:
|
|
tf.extractall(path=dest_dir)
|
|
|
|
def cpio_extract(archive, output_path):
|
|
cwd = os.getcwd()
|
|
os.chdir(output_path)
|
|
# Not passing 'check=True' as cpio exits with non-zero
|
|
# status if the archive contains any device nodes :-(
|
|
if type(archive) == str:
|
|
run(['cpio', '-i', '-F', archive],
|
|
stdout=DEVNULL, stderr=DEVNULL)
|
|
else:
|
|
run(['cpio', '-i'],
|
|
input=archive.read(),
|
|
stdout=DEVNULL, stderr=DEVNULL)
|
|
os.chdir(cwd)
|
|
|
|
def zip_extract(archive, dest_dir, member=None):
|
|
with zipfile.ZipFile(archive, 'r') as zf:
|
|
if member:
|
|
zf.extract(member=member, path=dest_dir)
|
|
else:
|
|
zf.extractall(path=dest_dir)
|
|
|
|
def deb_extract(archive, dest_dir, member=None):
|
|
cwd = os.getcwd()
|
|
os.chdir(dest_dir)
|
|
try:
|
|
proc = run(['ar', 't', archive],
|
|
check=True, capture_output=True, encoding='utf8')
|
|
file_path = proc.stdout.split()[2]
|
|
check_call(['ar', 'x', archive, file_path],
|
|
stdout=DEVNULL, stderr=DEVNULL)
|
|
tar_extract(file_path, dest_dir, member)
|
|
finally:
|
|
os.chdir(cwd)
|
|
|
|
'''
|
|
@params archive: filename, Asset, or file-like object to extract
|
|
@params dest_dir: target directory to extract into
|
|
@params member: optional member file to limit extraction to
|
|
|
|
Extracts @archive into @dest_dir. All files are extracted
|
|
unless @member specifies a limit.
|
|
|
|
If @format is None, heuristics will be applied to guess the format
|
|
from the filename or Asset URL. @format must be non-None if @archive
|
|
is a file-like object.
|
|
'''
|
|
def archive_extract(archive, dest_dir, format=None, member=None):
|
|
if format is None:
|
|
format = guess_archive_format(archive)
|
|
if type(archive) == Asset:
|
|
archive = str(archive)
|
|
|
|
if format == "tar":
|
|
tar_extract(archive, dest_dir, member)
|
|
elif format == "zip":
|
|
zip_extract(archive, dest_dir, member)
|
|
elif format == "cpio":
|
|
if member is not None:
|
|
raise Exception("Unable to filter cpio extraction")
|
|
cpio_extract(archive, dest_dir)
|
|
elif format == "deb":
|
|
if type(archive) != str:
|
|
raise Exception("Unable to use file-like object with deb archives")
|
|
deb_extract(archive, dest_dir, "./" + member)
|
|
else:
|
|
raise Exception(f"Unknown archive format {format}")
|
|
|
|
'''
|
|
@params archive: filename, or Asset to guess
|
|
|
|
Guess the format of @compressed, raising an exception if
|
|
no format can be determined
|
|
'''
|
|
def guess_archive_format(archive):
|
|
if type(archive) == Asset:
|
|
archive = urlparse(archive.url).path
|
|
elif type(archive) != str:
|
|
raise Exception(f"Unable to guess archive format for {archive}")
|
|
|
|
if ".tar." in archive or archive.endswith("tgz"):
|
|
return "tar"
|
|
elif archive.endswith(".zip"):
|
|
return "zip"
|
|
elif archive.endswith(".cpio"):
|
|
return "cpio"
|
|
elif archive.endswith(".deb") or archive.endswith(".udeb"):
|
|
return "deb"
|
|
else:
|
|
raise Exception(f"Unknown archive format for {archive}")
|