libjxl

FORK: libjxl patches used on blog
git clone https://git.neptards.moe/blog/libjxl.git
Log | Files | Refs | Submodules | README | LICENSE

conformance.py (10347B)


      1 #!/usr/bin/env python3
      2 # Copyright (c) the JPEG XL Project Authors. All rights reserved.
      3 #
      4 # Use of this source code is governed by a BSD-style
      5 # license that can be found in the LICENSE file.
      6 """JPEG XL conformance test runner.
      7 
      8 Tool to perform a conformance test for a decoder.
      9 """
     10 
     11 import argparse
     12 import json
     13 import numpy
     14 import os
     15 import shutil
     16 import subprocess
     17 import sys
     18 import tempfile
     19 
     20 import lcms2
     21 
     22 def Failure(message):
     23     print(f"\033[91m{message}\033[0m", flush=True)
     24     return False
     25 
     26 def CompareNPY(ref, ref_icc, dec, dec_icc, frame_idx, rmse_limit, peak_error):
     27     """Compare a decoded numpy against the reference one."""
     28     if ref.shape != dec.shape:
     29         return Failure(f'Expected shape {ref.shape} but found {dec.shape}')
     30     ref_frame = ref[frame_idx]
     31     dec_frame = dec[frame_idx]
     32     num_channels = ref_frame.shape[2]
     33 
     34     if ref_icc != dec_icc:
     35         # Transform colors before comparison.
     36         if num_channels < 3:
     37             return Failure(f"Only RGB images are supported")
     38         dec_clr = dec_frame[:, :, 0:3]
     39         dec_frame[:, :, 0:3] = lcms2.convert_pixels(dec_icc, ref_icc, dec_clr)
     40 
     41     error = numpy.abs(ref_frame - dec_frame)
     42     actual_peak_error = error.max()
     43     error_by_channel = [error[:, :, ch] for ch in range(num_channels)]
     44     actual_rmses = [numpy.sqrt(numpy.mean(error_ch * error_ch)) for error_ch in error_by_channel]
     45     actual_rmse = max(actual_rmses)
     46 
     47     print(f"RMSE: {actual_rmses}, peak error: {actual_peak_error}", flush=True)
     48 
     49     if actual_rmse > rmse_limit:
     50         return Failure(f"RMSE too large: {actual_rmse} > {rmse_limit}")
     51 
     52     if actual_peak_error > peak_error:
     53         return Failure(
     54             f"Peak error too large: {actual_peak_error} > {peak_error}")
     55     return True
     56 
     57 
     58 def CompareBinaries(ref_bin, dec_bin):
     59     """Compare a decoded binary file against the reference for exact contents."""
     60     with open(ref_bin, 'rb') as reff:
     61         ref_data = reff.read()
     62 
     63     with open(dec_bin, 'rb') as decf:
     64         dec_data = decf.read()
     65 
     66     if ref_data != dec_data:
     67         return Failure(
     68             f'Binary files mismatch: {ref_bin} {dec_bin}')
     69     return True
     70 
     71 
     72 TEST_KEYS = set(
     73     ['reconstructed_jpeg', 'original_icc', 'rms_error', 'peak_error'])
     74 
     75 
     76 def CheckMeta(dec, ref):
     77     if isinstance(ref, dict):
     78         if not isinstance(dec, dict):
     79             return Failure("Malformed metadata file")
     80         for k, v in ref.items():
     81             if k in TEST_KEYS:
     82                 continue
     83             if k not in dec:
     84                 return Failure(
     85                     f"Malformed metadata file: key {k} not found")
     86             vv = dec[k]
     87             return CheckMeta(vv, v)
     88     elif isinstance(ref, list):
     89         if not isinstance(dec, list) or len(dec) != len(ref):
     90             return Failure("Malformed metadata file")
     91         for vv, v in zip(dec, ref):
     92             return CheckMeta(vv, v)
     93     elif isinstance(ref, float):
     94         if not isinstance(dec, float):
     95             return Failure("Malformed metadata file")
     96         if abs(dec - ref) > 0.0001:
     97             return Failure(
     98                 f"Metadata: Expected {ref}, found {dec}")
     99     elif dec != ref:
    100         return Failure(f"Metadata: Expected {ref}, found {dec}")
    101     return True
    102 
    103 
    104 def ConformanceTestRunner(args):
    105     ok = True
    106     # We can pass either the .txt file or the directory which defaults to the
    107     # full corpus. This is useful to run a subset of the corpus in other .txt
    108     # files.
    109     if os.path.isdir(args.corpus):
    110         corpus_dir = args.corpus
    111         corpus_txt = os.path.join(args.corpus, 'corpus.txt')
    112     else:
    113         corpus_dir = os.path.dirname(args.corpus)
    114         corpus_txt = args.corpus
    115 
    116     with open(corpus_txt, 'r') as f:
    117         for test_id in f:
    118             test_id = test_id.rstrip('\n')
    119             print(f"\033[94m\033[1mTesting {test_id}\033[0m", flush=True)
    120             test_dir = os.path.join(corpus_dir, test_id)
    121 
    122             with open(os.path.join(test_dir, 'test.json'), 'r') as f:
    123                 descriptor = json.load(f)
    124                 if 'sha256sums' in descriptor:
    125                     del descriptor['sha256sums']
    126 
    127             exact_tests = []
    128 
    129             with tempfile.TemporaryDirectory(prefix=test_id) as work_dir:
    130                 input_filename = os.path.join(test_dir, 'input.jxl')
    131                 pixel_prefix = os.path.join(work_dir, 'decoded')
    132                 output_filename = pixel_prefix + '_image.npy'
    133                 cmd = [args.decoder, input_filename, output_filename]
    134                 cmd_jpeg = []
    135                 if 'preview' in descriptor:
    136                     preview_filename = os.path.join(work_dir,
    137                                                     'decoded_preview.npy')
    138                     cmd.extend(['--preview_out', preview_filename])
    139                 if 'reconstructed_jpeg' in descriptor:
    140                     jpeg_filename = os.path.join(work_dir, 'reconstructed.jpg')
    141                     cmd_jpeg = [args.decoder, input_filename, jpeg_filename]
    142                     exact_tests.append(('reconstructed.jpg', jpeg_filename))
    143                 if 'original_icc' in descriptor:
    144                     decoded_original_icc = os.path.join(
    145                         work_dir, 'decoded_org.icc')
    146                     cmd.extend(['--orig_icc_out', decoded_original_icc])
    147                     exact_tests.append(('original.icc', decoded_original_icc))
    148                 meta_filename = os.path.join(work_dir, 'meta.json')
    149                 cmd.extend(['--metadata_out', meta_filename])
    150                 cmd.extend(['--icc_out', pixel_prefix + '.icc'])
    151                 cmd.extend(['--norender_spotcolors'])
    152 
    153                 print(f"Running: {cmd}", flush=True)
    154                 if subprocess.call(cmd) != 0:
    155                     ok = Failure('Running the decoder (%s) returned error' %
    156                                  ' '.join(cmd))
    157                     continue
    158                 if cmd_jpeg:
    159                     print(f"Running: {cmd_jpeg}", flush=True)
    160                     if subprocess.call(cmd_jpeg) != 0:
    161                         ok = Failure(
    162                             'Running the decoder (%s) returned error' %
    163                             ' '.join(cmd_jpeg))
    164                         continue
    165 
    166                 # Run validation of exact files.
    167                 for reference_basename, decoded_filename in exact_tests:
    168                     reference_filename = os.path.join(test_dir,
    169                                                       reference_basename)
    170                     binary_ok = CompareBinaries(reference_filename,
    171                                                 decoded_filename)
    172                     if not binary_ok and args.update_on_failure:
    173                         os.unlink(reference_filename)
    174                         shutil.copy2(decoded_filename, reference_filename)
    175                         binary_ok = True
    176                     ok = ok & binary_ok
    177 
    178                 # Validate metadata.
    179                 with open(meta_filename, 'r') as f:
    180                     meta = json.load(f)
    181 
    182                 ok = ok & CheckMeta(meta, descriptor)
    183 
    184                 # Pixel data.
    185                 decoded_icc = pixel_prefix + '.icc'
    186                 with open(decoded_icc, 'rb') as f:
    187                     decoded_icc = f.read()
    188                 reference_icc = os.path.join(test_dir, "reference.icc")
    189                 with open(reference_icc, 'rb') as f:
    190                     reference_icc = f.read()
    191 
    192                 reference_npy_fn = os.path.join(test_dir, 'reference_image.npy')
    193                 decoded_npy_fn = os.path.join(work_dir, 'decoded_image.npy')
    194 
    195                 if not os.path.exists(decoded_npy_fn):
    196                     ok = Failure('File not decoded: decoded_image.npy')
    197                     continue
    198 
    199                 reference_npy = numpy.load(reference_npy_fn)
    200                 decoded_npy = numpy.load(decoded_npy_fn)
    201 
    202                 frames_ok = True
    203                 for i, fd in enumerate(descriptor['frames']):
    204                     frames_ok = frames_ok & CompareNPY(
    205                         reference_npy, reference_icc, decoded_npy,
    206                         decoded_icc, i, fd['rms_error'],
    207                         fd['peak_error'])
    208 
    209                 if not frames_ok and args.update_on_failure:
    210                     os.unlink(reference_npy_fn)
    211                     shutil.copy2(decoded_npy_fn, reference_npy_fn)
    212                     frames_ok = True
    213                 ok = ok & frames_ok
    214 
    215                 if 'preview' in descriptor:
    216                     reference_npy_fn = os.path.join(test_dir,
    217                                                     'reference_preview.npy')
    218                     decoded_npy_fn = os.path.join(work_dir,
    219                                                   'decoded_preview.npy')
    220 
    221                     if not os.path.exists(decoded_npy_fn):
    222                         ok = Failure(
    223                             'File not decoded: decoded_preview.npy')
    224 
    225                     reference_npy = numpy.load(reference_npy_fn)
    226                     decoded_npy = numpy.load(decoded_npy_fn)
    227                     preview_ok = CompareNPY(reference_npy, reference_icc,
    228                                             decoded_npy, decoded_icc, 0,
    229                                             descriptor['preview']['rms_error'],
    230                                             descriptor['preview']['peak_error'])
    231                     if not preview_ok & args.update_on_failure:
    232                         os.unlink(reference_npy_fn)
    233                         shutil.copy2(decoded_npy_fn, reference_npy_fn)
    234                         preview_ok = True
    235                     ok = ok & preview_ok
    236 
    237     return ok
    238 
    239 
    240 def main():
    241     parser = argparse.ArgumentParser(description=__doc__)
    242     parser.add_argument('--decoder',
    243                         metavar='DECODER',
    244                         required=True,
    245                         help='path to the decoder binary under test.')
    246     parser.add_argument(
    247         '--corpus',
    248         metavar='CORPUS',
    249         required=True,
    250         help=('path to the corpus directory or corpus descriptor'
    251               ' text file.'))
    252     parser.add_argument(
    253         '--update_on_failure', action='store_true',
    254         help='If set, updates reference files on failing checks.')
    255     args = parser.parse_args()
    256     if not ConformanceTestRunner(args):
    257         sys.exit(1)
    258 
    259 
    260 if __name__ == '__main__':
    261     main()