conformance.py (10347B)
1 #!/usr/bin/env python3 2 # Copyright (c) the JPEG XL Project Authors. All rights reserved. 3 # 4 # Use of this source code is governed by a BSD-style 5 # license that can be found in the LICENSE file. 6 """JPEG XL conformance test runner. 7 8 Tool to perform a conformance test for a decoder. 9 """ 10 11 import argparse 12 import json 13 import numpy 14 import os 15 import shutil 16 import subprocess 17 import sys 18 import tempfile 19 20 import lcms2 21 22 def Failure(message): 23 print(f"\033[91m{message}\033[0m", flush=True) 24 return False 25 26 def CompareNPY(ref, ref_icc, dec, dec_icc, frame_idx, rmse_limit, peak_error): 27 """Compare a decoded numpy against the reference one.""" 28 if ref.shape != dec.shape: 29 return Failure(f'Expected shape {ref.shape} but found {dec.shape}') 30 ref_frame = ref[frame_idx] 31 dec_frame = dec[frame_idx] 32 num_channels = ref_frame.shape[2] 33 34 if ref_icc != dec_icc: 35 # Transform colors before comparison. 36 if num_channels < 3: 37 return Failure(f"Only RGB images are supported") 38 dec_clr = dec_frame[:, :, 0:3] 39 dec_frame[:, :, 0:3] = lcms2.convert_pixels(dec_icc, ref_icc, dec_clr) 40 41 error = numpy.abs(ref_frame - dec_frame) 42 actual_peak_error = error.max() 43 error_by_channel = [error[:, :, ch] for ch in range(num_channels)] 44 actual_rmses = [numpy.sqrt(numpy.mean(error_ch * error_ch)) for error_ch in error_by_channel] 45 actual_rmse = max(actual_rmses) 46 47 print(f"RMSE: {actual_rmses}, peak error: {actual_peak_error}", flush=True) 48 49 if actual_rmse > rmse_limit: 50 return Failure(f"RMSE too large: {actual_rmse} > {rmse_limit}") 51 52 if actual_peak_error > peak_error: 53 return Failure( 54 f"Peak error too large: {actual_peak_error} > {peak_error}") 55 return True 56 57 58 def CompareBinaries(ref_bin, dec_bin): 59 """Compare a decoded binary file against the reference for exact contents.""" 60 with open(ref_bin, 'rb') as reff: 61 ref_data = reff.read() 62 63 with open(dec_bin, 'rb') as decf: 64 dec_data = decf.read() 65 66 if ref_data != dec_data: 67 return Failure( 68 f'Binary files mismatch: {ref_bin} {dec_bin}') 69 return True 70 71 72 TEST_KEYS = set( 73 ['reconstructed_jpeg', 'original_icc', 'rms_error', 'peak_error']) 74 75 76 def CheckMeta(dec, ref): 77 if isinstance(ref, dict): 78 if not isinstance(dec, dict): 79 return Failure("Malformed metadata file") 80 for k, v in ref.items(): 81 if k in TEST_KEYS: 82 continue 83 if k not in dec: 84 return Failure( 85 f"Malformed metadata file: key {k} not found") 86 vv = dec[k] 87 return CheckMeta(vv, v) 88 elif isinstance(ref, list): 89 if not isinstance(dec, list) or len(dec) != len(ref): 90 return Failure("Malformed metadata file") 91 for vv, v in zip(dec, ref): 92 return CheckMeta(vv, v) 93 elif isinstance(ref, float): 94 if not isinstance(dec, float): 95 return Failure("Malformed metadata file") 96 if abs(dec - ref) > 0.0001: 97 return Failure( 98 f"Metadata: Expected {ref}, found {dec}") 99 elif dec != ref: 100 return Failure(f"Metadata: Expected {ref}, found {dec}") 101 return True 102 103 104 def ConformanceTestRunner(args): 105 ok = True 106 # We can pass either the .txt file or the directory which defaults to the 107 # full corpus. This is useful to run a subset of the corpus in other .txt 108 # files. 109 if os.path.isdir(args.corpus): 110 corpus_dir = args.corpus 111 corpus_txt = os.path.join(args.corpus, 'corpus.txt') 112 else: 113 corpus_dir = os.path.dirname(args.corpus) 114 corpus_txt = args.corpus 115 116 with open(corpus_txt, 'r') as f: 117 for test_id in f: 118 test_id = test_id.rstrip('\n') 119 print(f"\033[94m\033[1mTesting {test_id}\033[0m", flush=True) 120 test_dir = os.path.join(corpus_dir, test_id) 121 122 with open(os.path.join(test_dir, 'test.json'), 'r') as f: 123 descriptor = json.load(f) 124 if 'sha256sums' in descriptor: 125 del descriptor['sha256sums'] 126 127 exact_tests = [] 128 129 with tempfile.TemporaryDirectory(prefix=test_id) as work_dir: 130 input_filename = os.path.join(test_dir, 'input.jxl') 131 pixel_prefix = os.path.join(work_dir, 'decoded') 132 output_filename = pixel_prefix + '_image.npy' 133 cmd = [args.decoder, input_filename, output_filename] 134 cmd_jpeg = [] 135 if 'preview' in descriptor: 136 preview_filename = os.path.join(work_dir, 137 'decoded_preview.npy') 138 cmd.extend(['--preview_out', preview_filename]) 139 if 'reconstructed_jpeg' in descriptor: 140 jpeg_filename = os.path.join(work_dir, 'reconstructed.jpg') 141 cmd_jpeg = [args.decoder, input_filename, jpeg_filename] 142 exact_tests.append(('reconstructed.jpg', jpeg_filename)) 143 if 'original_icc' in descriptor: 144 decoded_original_icc = os.path.join( 145 work_dir, 'decoded_org.icc') 146 cmd.extend(['--orig_icc_out', decoded_original_icc]) 147 exact_tests.append(('original.icc', decoded_original_icc)) 148 meta_filename = os.path.join(work_dir, 'meta.json') 149 cmd.extend(['--metadata_out', meta_filename]) 150 cmd.extend(['--icc_out', pixel_prefix + '.icc']) 151 cmd.extend(['--norender_spotcolors']) 152 153 print(f"Running: {cmd}", flush=True) 154 if subprocess.call(cmd) != 0: 155 ok = Failure('Running the decoder (%s) returned error' % 156 ' '.join(cmd)) 157 continue 158 if cmd_jpeg: 159 print(f"Running: {cmd_jpeg}", flush=True) 160 if subprocess.call(cmd_jpeg) != 0: 161 ok = Failure( 162 'Running the decoder (%s) returned error' % 163 ' '.join(cmd_jpeg)) 164 continue 165 166 # Run validation of exact files. 167 for reference_basename, decoded_filename in exact_tests: 168 reference_filename = os.path.join(test_dir, 169 reference_basename) 170 binary_ok = CompareBinaries(reference_filename, 171 decoded_filename) 172 if not binary_ok and args.update_on_failure: 173 os.unlink(reference_filename) 174 shutil.copy2(decoded_filename, reference_filename) 175 binary_ok = True 176 ok = ok & binary_ok 177 178 # Validate metadata. 179 with open(meta_filename, 'r') as f: 180 meta = json.load(f) 181 182 ok = ok & CheckMeta(meta, descriptor) 183 184 # Pixel data. 185 decoded_icc = pixel_prefix + '.icc' 186 with open(decoded_icc, 'rb') as f: 187 decoded_icc = f.read() 188 reference_icc = os.path.join(test_dir, "reference.icc") 189 with open(reference_icc, 'rb') as f: 190 reference_icc = f.read() 191 192 reference_npy_fn = os.path.join(test_dir, 'reference_image.npy') 193 decoded_npy_fn = os.path.join(work_dir, 'decoded_image.npy') 194 195 if not os.path.exists(decoded_npy_fn): 196 ok = Failure('File not decoded: decoded_image.npy') 197 continue 198 199 reference_npy = numpy.load(reference_npy_fn) 200 decoded_npy = numpy.load(decoded_npy_fn) 201 202 frames_ok = True 203 for i, fd in enumerate(descriptor['frames']): 204 frames_ok = frames_ok & CompareNPY( 205 reference_npy, reference_icc, decoded_npy, 206 decoded_icc, i, fd['rms_error'], 207 fd['peak_error']) 208 209 if not frames_ok and args.update_on_failure: 210 os.unlink(reference_npy_fn) 211 shutil.copy2(decoded_npy_fn, reference_npy_fn) 212 frames_ok = True 213 ok = ok & frames_ok 214 215 if 'preview' in descriptor: 216 reference_npy_fn = os.path.join(test_dir, 217 'reference_preview.npy') 218 decoded_npy_fn = os.path.join(work_dir, 219 'decoded_preview.npy') 220 221 if not os.path.exists(decoded_npy_fn): 222 ok = Failure( 223 'File not decoded: decoded_preview.npy') 224 225 reference_npy = numpy.load(reference_npy_fn) 226 decoded_npy = numpy.load(decoded_npy_fn) 227 preview_ok = CompareNPY(reference_npy, reference_icc, 228 decoded_npy, decoded_icc, 0, 229 descriptor['preview']['rms_error'], 230 descriptor['preview']['peak_error']) 231 if not preview_ok & args.update_on_failure: 232 os.unlink(reference_npy_fn) 233 shutil.copy2(decoded_npy_fn, reference_npy_fn) 234 preview_ok = True 235 ok = ok & preview_ok 236 237 return ok 238 239 240 def main(): 241 parser = argparse.ArgumentParser(description=__doc__) 242 parser.add_argument('--decoder', 243 metavar='DECODER', 244 required=True, 245 help='path to the decoder binary under test.') 246 parser.add_argument( 247 '--corpus', 248 metavar='CORPUS', 249 required=True, 250 help=('path to the corpus directory or corpus descriptor' 251 ' text file.')) 252 parser.add_argument( 253 '--update_on_failure', action='store_true', 254 help='If set, updates reference files on failing checks.') 255 args = parser.parse_args() 256 if not ConformanceTestRunner(args): 257 sys.exit(1) 258 259 260 if __name__ == '__main__': 261 main()