wafcache.py (18813B)
1 #! /usr/bin/env python 2 # encoding: utf-8 3 # Thomas Nagy, 2019 (ita) 4 5 """ 6 Filesystem-based cache system to share and re-use build artifacts 7 8 Cache access operations (copy to and from) are delegated to 9 independent pre-forked worker subprocesses. 10 11 The following environment variables may be set: 12 * WAFCACHE: several possibilities: 13 - File cache: 14 absolute path of the waf cache (~/.cache/wafcache_user, 15 where `user` represents the currently logged-in user) 16 - URL to a cache server, for example: 17 export WAFCACHE=http://localhost:8080/files/ 18 in that case, GET/POST requests are made to urls of the form 19 http://localhost:8080/files/000000000/0 (cache management is delegated to the server) 20 - GCS, S3 or MINIO bucket 21 gs://my-bucket/ (uses gsutil command line tool or WAFCACHE_CMD) 22 s3://my-bucket/ (uses aws command line tool or WAFCACHE_CMD) 23 minio://my-bucket/ (uses mc command line tool or WAFCACHE_CMD) 24 * WAFCACHE_CMD: bucket upload/download command, for example: 25 WAFCACHE_CMD="gsutil cp %{SRC} %{TGT}" 26 Note that the WAFCACHE bucket value is used for the source or destination 27 depending on the operation (upload or download). For example, with: 28 WAFCACHE="gs://mybucket/" 29 the following commands may be run: 30 gsutil cp build/myprogram gs://mybucket/aa/aaaaa/1 31 gsutil cp gs://mybucket/bb/bbbbb/2 build/somefile 32 * WAFCACHE_NO_PUSH: if set, disables pushing to the cache 33 * WAFCACHE_VERBOSITY: if set, displays more detailed cache operations 34 * WAFCACHE_STATS: if set, displays cache usage statistics on exit 35 36 File cache specific options: 37 Files are copied using hard links by default; if the cache is located 38 onto another partition, the system switches to file copies instead. 39 * WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M) 40 * WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB) 41 * WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try 42 and trim the cache (3 minutes) 43 44 Upload specific options: 45 * WAFCACHE_ASYNC_WORKERS: define a number of workers to upload results asynchronously 46 this may improve build performance with many/long file uploads 47 the default is unset (synchronous uploads) 48 * WAFCACHE_ASYNC_NOWAIT: do not wait for uploads to complete (default: False) 49 this requires asynchonous uploads to have an effect 50 51 Usage:: 52 53 def build(bld): 54 bld.load('wafcache') 55 ... 56 57 To troubleshoot:: 58 59 waf clean build --zone=wafcache 60 """ 61 62 import atexit, base64, errno, fcntl, getpass, os, re, shutil, sys, time, threading, traceback, urllib3, shlex 63 try: 64 import subprocess32 as subprocess 65 except ImportError: 66 import subprocess 67 68 base_cache = os.path.expanduser('~/.cache/') 69 if not os.path.isdir(base_cache): 70 base_cache = '/tmp/' 71 default_wafcache_dir = os.path.join(base_cache, 'wafcache_' + getpass.getuser()) 72 73 CACHE_DIR = os.environ.get('WAFCACHE', default_wafcache_dir) 74 WAFCACHE_CMD = os.environ.get('WAFCACHE_CMD') 75 TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000)) 76 EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3)) 77 EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10)) 78 WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0 79 WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0 80 WAFCACHE_STATS = 1 if os.environ.get('WAFCACHE_STATS') else 0 81 WAFCACHE_ASYNC_WORKERS = os.environ.get('WAFCACHE_ASYNC_WORKERS') 82 WAFCACHE_ASYNC_NOWAIT = os.environ.get('WAFCACHE_ASYNC_NOWAIT') 83 OK = "ok" 84 85 re_waf_cmd = re.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})') 86 87 try: 88 import cPickle 89 except ImportError: 90 import pickle as cPickle 91 92 if __name__ != '__main__': 93 from waflib import Task, Logs, Utils, Build 94 95 def can_retrieve_cache(self): 96 """ 97 New method for waf Task classes 98 """ 99 if not self.outputs: 100 return False 101 102 self.cached = False 103 104 sig = self.signature() 105 ssig = Utils.to_hex(self.uid() + sig) 106 107 if WAFCACHE_STATS: 108 self.generator.bld.cache_reqs += 1 109 110 files_to = [node.abspath() for node in self.outputs] 111 proc = get_process() 112 err = cache_command(proc, ssig, [], files_to) 113 process_pool.append(proc) 114 if err.startswith(OK): 115 if WAFCACHE_VERBOSITY: 116 Logs.pprint('CYAN', ' Fetched %r from cache' % files_to) 117 else: 118 Logs.debug('wafcache: fetched %r from cache', files_to) 119 if WAFCACHE_STATS: 120 self.generator.bld.cache_hits += 1 121 else: 122 if WAFCACHE_VERBOSITY: 123 Logs.pprint('YELLOW', ' No cache entry %s' % files_to) 124 else: 125 Logs.debug('wafcache: No cache entry %s: %s', files_to, err) 126 return False 127 128 self.cached = True 129 return True 130 131 def put_files_cache(self): 132 """ 133 New method for waf Task classes 134 """ 135 if WAFCACHE_NO_PUSH or getattr(self, 'cached', None) or not self.outputs: 136 return 137 138 files_from = [] 139 for node in self.outputs: 140 path = node.abspath() 141 if not os.path.isfile(path): 142 return 143 files_from.append(path) 144 145 bld = self.generator.bld 146 old_sig = self.signature() 147 148 for node in self.inputs: 149 try: 150 del node.ctx.cache_sig[node] 151 except KeyError: 152 pass 153 154 delattr(self, 'cache_sig') 155 sig = self.signature() 156 157 def _async_put_files_cache(bld, ssig, files_from): 158 proc = get_process() 159 if WAFCACHE_ASYNC_WORKERS: 160 with bld.wafcache_lock: 161 if bld.wafcache_stop: 162 process_pool.append(proc) 163 return 164 bld.wafcache_procs.add(proc) 165 166 err = cache_command(proc, ssig, files_from, []) 167 process_pool.append(proc) 168 if err.startswith(OK): 169 if WAFCACHE_VERBOSITY: 170 Logs.pprint('CYAN', ' Successfully uploaded %s to cache' % files_from) 171 else: 172 Logs.debug('wafcache: Successfully uploaded %r to cache', files_from) 173 if WAFCACHE_STATS: 174 bld.cache_puts += 1 175 else: 176 if WAFCACHE_VERBOSITY: 177 Logs.pprint('RED', ' Error caching step results %s: %s' % (files_from, err)) 178 else: 179 Logs.debug('wafcache: Error caching results %s: %s', files_from, err) 180 181 if old_sig == sig: 182 ssig = Utils.to_hex(self.uid() + sig) 183 if WAFCACHE_ASYNC_WORKERS: 184 fut = bld.wafcache_executor.submit(_async_put_files_cache, bld, ssig, files_from) 185 bld.wafcache_uploads.append(fut) 186 else: 187 _async_put_files_cache(bld, ssig, files_from) 188 else: 189 Logs.debug('wafcache: skipped %r upload due to late input modifications %r', self.outputs, self.inputs) 190 191 bld.task_sigs[self.uid()] = self.cache_sig 192 193 def hash_env_vars(self, env, vars_lst): 194 """ 195 Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths 196 """ 197 if not env.table: 198 env = env.parent 199 if not env: 200 return Utils.SIG_NIL 201 202 idx = str(id(env)) + str(vars_lst) 203 try: 204 cache = self.cache_env 205 except AttributeError: 206 cache = self.cache_env = {} 207 else: 208 try: 209 return self.cache_env[idx] 210 except KeyError: 211 pass 212 213 v = str([env[a] for a in vars_lst]) 214 v = v.replace(self.srcnode.abspath().__repr__()[:-1], '') 215 m = Utils.md5() 216 m.update(v.encode()) 217 ret = m.digest() 218 219 Logs.debug('envhash: %r %r', ret, v) 220 221 cache[idx] = ret 222 223 return ret 224 225 def uid(self): 226 """ 227 Reimplement Task.uid() so that the signature does not depend on local paths 228 """ 229 try: 230 return self.uid_ 231 except AttributeError: 232 m = Utils.md5() 233 src = self.generator.bld.srcnode 234 up = m.update 235 up(self.__class__.__name__.encode()) 236 for x in self.inputs + self.outputs: 237 up(x.path_from(src).encode()) 238 self.uid_ = m.digest() 239 return self.uid_ 240 241 242 def make_cached(cls): 243 """ 244 Enable the waf cache for a given task class 245 """ 246 if getattr(cls, 'nocache', None) or getattr(cls, 'has_cache', False): 247 return 248 249 full_name = "%s.%s" % (cls.__module__, cls.__name__) 250 if full_name in ('waflib.Tools.ccroot.vnum', 'waflib.Build.inst'): 251 return 252 253 m1 = getattr(cls, 'run', None) 254 def run(self): 255 if getattr(self, 'nocache', False): 256 return m1(self) 257 if self.can_retrieve_cache(): 258 return 0 259 return m1(self) 260 cls.run = run 261 262 m2 = getattr(cls, 'post_run', None) 263 def post_run(self): 264 if getattr(self, 'nocache', False): 265 return m2(self) 266 ret = m2(self) 267 self.put_files_cache() 268 return ret 269 cls.post_run = post_run 270 cls.has_cache = True 271 272 process_pool = [] 273 def get_process(): 274 """ 275 Returns a worker process that can process waf cache commands 276 The worker process is assumed to be returned to the process pool when unused 277 """ 278 try: 279 return process_pool.pop() 280 except IndexError: 281 filepath = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'wafcache.py' 282 cmd = [sys.executable, '-c', Utils.readf(filepath)] 283 return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0) 284 285 def atexit_pool(): 286 for proc in process_pool: 287 proc.kill() 288 atexit.register(atexit_pool) 289 290 def build(bld): 291 """ 292 Called during the build process to enable file caching 293 """ 294 295 if WAFCACHE_ASYNC_WORKERS: 296 try: 297 num_workers = int(WAFCACHE_ASYNC_WORKERS) 298 except ValueError: 299 Logs.warn('Invalid WAFCACHE_ASYNC_WORKERS specified: %r' % WAFCACHE_ASYNC_WORKERS) 300 else: 301 from concurrent.futures import ThreadPoolExecutor 302 bld.wafcache_executor = ThreadPoolExecutor(max_workers=num_workers) 303 bld.wafcache_uploads = [] 304 bld.wafcache_procs = set([]) 305 bld.wafcache_stop = False 306 bld.wafcache_lock = threading.Lock() 307 308 def finalize_upload_async(bld): 309 if WAFCACHE_ASYNC_NOWAIT: 310 with bld.wafcache_lock: 311 bld.wafcache_stop = True 312 313 for fut in reversed(bld.wafcache_uploads): 314 fut.cancel() 315 316 for proc in bld.wafcache_procs: 317 proc.kill() 318 319 bld.wafcache_procs.clear() 320 else: 321 Logs.pprint('CYAN', '... waiting for wafcache uploads to complete (%s uploads)' % len(bld.wafcache_uploads)) 322 bld.wafcache_executor.shutdown(wait=True) 323 bld.add_post_fun(finalize_upload_async) 324 325 if WAFCACHE_STATS: 326 # Init counter for statistics and hook to print results at the end 327 bld.cache_reqs = bld.cache_hits = bld.cache_puts = 0 328 329 def printstats(bld): 330 hit_ratio = 0 331 if bld.cache_reqs > 0: 332 hit_ratio = (bld.cache_hits / bld.cache_reqs) * 100 333 Logs.pprint('CYAN', ' wafcache stats: %s requests, %s hits (ratio: %.2f%%), %s writes' % 334 (bld.cache_reqs, bld.cache_hits, hit_ratio, bld.cache_puts) ) 335 bld.add_post_fun(printstats) 336 337 if process_pool: 338 # already called once 339 return 340 341 # pre-allocation 342 processes = [get_process() for x in range(bld.jobs)] 343 process_pool.extend(processes) 344 345 Task.Task.can_retrieve_cache = can_retrieve_cache 346 Task.Task.put_files_cache = put_files_cache 347 Task.Task.uid = uid 348 Build.BuildContext.hash_env_vars = hash_env_vars 349 for x in reversed(list(Task.classes.values())): 350 make_cached(x) 351 352 def cache_command(proc, sig, files_from, files_to): 353 """ 354 Create a command for cache worker processes, returns a pickled 355 base64-encoded tuple containing the task signature, a list of files to 356 cache and a list of files files to get from cache (one of the lists 357 is assumed to be empty) 358 """ 359 obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to])) 360 proc.stdin.write(obj) 361 proc.stdin.write('\n'.encode()) 362 proc.stdin.flush() 363 obj = proc.stdout.readline() 364 if not obj: 365 raise OSError('Preforked sub-process %r died' % proc.pid) 366 return cPickle.loads(base64.b64decode(obj)) 367 368 try: 369 copyfun = os.link 370 except NameError: 371 copyfun = shutil.copy2 372 373 def atomic_copy(orig, dest): 374 """ 375 Copy files to the cache, the operation is atomic for a given file 376 """ 377 global copyfun 378 tmp = dest + '.tmp' 379 up = os.path.dirname(dest) 380 try: 381 os.makedirs(up) 382 except OSError: 383 pass 384 385 try: 386 copyfun(orig, tmp) 387 except OSError as e: 388 if e.errno == errno.EXDEV: 389 copyfun = shutil.copy2 390 copyfun(orig, tmp) 391 else: 392 raise 393 os.rename(tmp, dest) 394 395 def lru_trim(): 396 """ 397 the cache folders take the form: 398 `CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9` 399 they are listed in order of last access, and then removed 400 until the amount of folders is within TRIM_MAX_FOLDERS and the total space 401 taken by files is less than EVICT_MAX_BYTES 402 """ 403 lst = [] 404 for up in os.listdir(CACHE_DIR): 405 if len(up) == 2: 406 sub = os.path.join(CACHE_DIR, up) 407 for hval in os.listdir(sub): 408 path = os.path.join(sub, hval) 409 410 size = 0 411 for fname in os.listdir(path): 412 try: 413 size += os.lstat(os.path.join(path, fname)).st_size 414 except OSError: 415 pass 416 lst.append((os.stat(path).st_mtime, size, path)) 417 418 lst.sort(key=lambda x: x[0]) 419 lst.reverse() 420 421 tot = sum(x[1] for x in lst) 422 while tot > EVICT_MAX_BYTES or len(lst) > TRIM_MAX_FOLDERS: 423 _, tmp_size, path = lst.pop() 424 tot -= tmp_size 425 426 tmp = path + '.remove' 427 try: 428 shutil.rmtree(tmp) 429 except OSError: 430 pass 431 try: 432 os.rename(path, tmp) 433 except OSError: 434 sys.stderr.write('Could not rename %r to %r\n' % (path, tmp)) 435 else: 436 try: 437 shutil.rmtree(tmp) 438 except OSError: 439 sys.stderr.write('Could not remove %r\n' % tmp) 440 sys.stderr.write("Cache trimmed: %r bytes in %r folders left\n" % (tot, len(lst))) 441 442 443 def lru_evict(): 444 """ 445 Reduce the cache size 446 """ 447 lockfile = os.path.join(CACHE_DIR, 'all.lock') 448 try: 449 st = os.stat(lockfile) 450 except EnvironmentError as e: 451 if e.errno == errno.ENOENT: 452 with open(lockfile, 'w') as f: 453 f.write('') 454 return 455 else: 456 raise 457 458 if st.st_mtime < time.time() - EVICT_INTERVAL_MINUTES * 60: 459 # check every EVICT_INTERVAL_MINUTES minutes if the cache is too big 460 # OCLOEXEC is unnecessary because no processes are spawned 461 fd = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o755) 462 try: 463 try: 464 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 465 except EnvironmentError: 466 if WAFCACHE_VERBOSITY: 467 sys.stderr.write('wafcache: another cleaning process is running\n') 468 else: 469 # now dow the actual cleanup 470 lru_trim() 471 os.utime(lockfile, None) 472 finally: 473 os.close(fd) 474 475 class netcache(object): 476 def __init__(self): 477 self.http = urllib3.PoolManager() 478 479 def url_of(self, sig, i): 480 return "%s/%s/%s" % (CACHE_DIR, sig, i) 481 482 def upload(self, file_path, sig, i): 483 url = self.url_of(sig, i) 484 with open(file_path, 'rb') as f: 485 file_data = f.read() 486 r = self.http.request('POST', url, timeout=60, 487 fields={ 'file': ('%s/%s' % (sig, i), file_data), }) 488 if r.status >= 400: 489 raise OSError("Invalid status %r %r" % (url, r.status)) 490 491 def download(self, file_path, sig, i): 492 url = self.url_of(sig, i) 493 with self.http.request('GET', url, preload_content=False, timeout=60) as inf: 494 if inf.status >= 400: 495 raise OSError("Invalid status %r %r" % (url, inf.status)) 496 with open(file_path, 'wb') as out: 497 shutil.copyfileobj(inf, out) 498 499 def copy_to_cache(self, sig, files_from, files_to): 500 try: 501 for i, x in enumerate(files_from): 502 if not os.path.islink(x): 503 self.upload(x, sig, i) 504 except Exception: 505 return traceback.format_exc() 506 return OK 507 508 def copy_from_cache(self, sig, files_from, files_to): 509 try: 510 for i, x in enumerate(files_to): 511 self.download(x, sig, i) 512 except Exception: 513 return traceback.format_exc() 514 return OK 515 516 class fcache(object): 517 def __init__(self): 518 if not os.path.exists(CACHE_DIR): 519 try: 520 os.makedirs(CACHE_DIR) 521 except OSError: 522 pass 523 if not os.path.exists(CACHE_DIR): 524 raise ValueError('Could not initialize the cache directory') 525 526 def copy_to_cache(self, sig, files_from, files_to): 527 """ 528 Copy files to the cache, existing files are overwritten, 529 and the copy is atomic only for a given file, not for all files 530 that belong to a given task object 531 """ 532 try: 533 for i, x in enumerate(files_from): 534 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) 535 atomic_copy(x, dest) 536 except Exception: 537 return traceback.format_exc() 538 else: 539 # attempt trimming if caching was successful: 540 # we may have things to trim! 541 try: 542 lru_evict() 543 except Exception: 544 return traceback.format_exc() 545 return OK 546 547 def copy_from_cache(self, sig, files_from, files_to): 548 """ 549 Copy files from the cache 550 """ 551 try: 552 for i, x in enumerate(files_to): 553 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) 554 atomic_copy(orig, x) 555 556 # success! update the cache time 557 os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None) 558 except Exception: 559 return traceback.format_exc() 560 return OK 561 562 class bucket_cache(object): 563 def bucket_copy(self, source, target): 564 if WAFCACHE_CMD: 565 def replacer(match): 566 if match.group('src'): 567 return source 568 elif match.group('tgt'): 569 return target 570 cmd = [re_waf_cmd.sub(replacer, x) for x in shlex.split(WAFCACHE_CMD)] 571 elif CACHE_DIR.startswith('s3://'): 572 cmd = ['aws', 's3', 'cp', source, target] 573 elif CACHE_DIR.startswith('gs://'): 574 cmd = ['gsutil', 'cp', source, target] 575 else: 576 cmd = ['mc', 'cp', source, target] 577 578 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 579 out, err = proc.communicate() 580 if proc.returncode: 581 raise OSError('Error copy %r to %r using: %r (exit %r):\n out:%s\n err:%s' % ( 582 source, target, cmd, proc.returncode, out.decode(errors='replace'), err.decode(errors='replace'))) 583 584 def copy_to_cache(self, sig, files_from, files_to): 585 try: 586 for i, x in enumerate(files_from): 587 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) 588 self.bucket_copy(x, dest) 589 except Exception: 590 return traceback.format_exc() 591 return OK 592 593 def copy_from_cache(self, sig, files_from, files_to): 594 try: 595 for i, x in enumerate(files_to): 596 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) 597 self.bucket_copy(orig, x) 598 except EnvironmentError: 599 return traceback.format_exc() 600 return OK 601 602 def loop(service): 603 """ 604 This function is run when this file is run as a standalone python script, 605 it assumes a parent process that will communicate the commands to it 606 as pickled-encoded tuples (one line per command) 607 608 The commands are to copy files to the cache or copy files from the 609 cache to a target destination 610 """ 611 # one operation is performed at a single time by a single process 612 # therefore stdin never has more than one line 613 txt = sys.stdin.readline().strip() 614 if not txt: 615 # parent process probably ended 616 sys.exit(1) 617 ret = OK 618 619 [sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt)) 620 if files_from: 621 # TODO return early when pushing files upstream 622 ret = service.copy_to_cache(sig, files_from, files_to) 623 elif files_to: 624 # the build process waits for workers to (possibly) obtain files from the cache 625 ret = service.copy_from_cache(sig, files_from, files_to) 626 else: 627 ret = "Invalid command" 628 629 obj = base64.b64encode(cPickle.dumps(ret)) 630 sys.stdout.write(obj.decode()) 631 sys.stdout.write('\n') 632 sys.stdout.flush() 633 634 if __name__ == '__main__': 635 if CACHE_DIR.startswith('s3://') or CACHE_DIR.startswith('gs://') or CACHE_DIR.startswith('minio://'): 636 if CACHE_DIR.startswith('minio://'): 637 CACHE_DIR = CACHE_DIR[8:] # minio doesn't need the protocol part, uses config aliases 638 service = bucket_cache() 639 elif CACHE_DIR.startswith('http'): 640 service = netcache() 641 else: 642 service = fcache() 643 while 1: 644 try: 645 loop(service) 646 except KeyboardInterrupt: 647 break 648