waf

FORK: waf with some random patches
git clone https://git.neptards.moe/neptards/waf.git
Log | Files | Refs | README

wafcache.py (18813B)


      1 #! /usr/bin/env python
      2 # encoding: utf-8
      3 # Thomas Nagy, 2019 (ita)
      4 
      5 """
      6 Filesystem-based cache system to share and re-use build artifacts
      7 
      8 Cache access operations (copy to and from) are delegated to
      9 independent pre-forked worker subprocesses.
     10 
     11 The following environment variables may be set:
     12 * WAFCACHE: several possibilities:
     13   - File cache:
     14     absolute path of the waf cache (~/.cache/wafcache_user,
     15     where `user` represents the currently logged-in user)
     16   - URL to a cache server, for example:
     17     export WAFCACHE=http://localhost:8080/files/
     18     in that case, GET/POST requests are made to urls of the form
     19     http://localhost:8080/files/000000000/0 (cache management is delegated to the server)
     20   - GCS, S3 or MINIO bucket
     21     gs://my-bucket/    (uses gsutil command line tool or WAFCACHE_CMD)
     22     s3://my-bucket/    (uses aws command line tool or WAFCACHE_CMD)
     23     minio://my-bucket/ (uses mc command line tool or WAFCACHE_CMD)
     24 * WAFCACHE_CMD: bucket upload/download command, for example:
     25     WAFCACHE_CMD="gsutil cp %{SRC} %{TGT}"
     26   Note that the WAFCACHE bucket value is used for the source or destination
     27   depending on the operation (upload or download). For example, with:
     28     WAFCACHE="gs://mybucket/"
     29   the following commands may be run:
     30     gsutil cp build/myprogram  gs://mybucket/aa/aaaaa/1
     31     gsutil cp gs://mybucket/bb/bbbbb/2 build/somefile
     32 * WAFCACHE_NO_PUSH: if set, disables pushing to the cache
     33 * WAFCACHE_VERBOSITY: if set, displays more detailed cache operations
     34 * WAFCACHE_STATS: if set, displays cache usage statistics on exit
     35 
     36 File cache specific options:
     37   Files are copied using hard links by default; if the cache is located
     38   onto another partition, the system switches to file copies instead.
     39 * WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
     40 * WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
     41 * WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try
     42                                    and trim the cache (3 minutes)
     43 
     44 Upload specific options:
     45 * WAFCACHE_ASYNC_WORKERS: define a number of workers to upload results asynchronously
     46                           this may improve build performance with many/long file uploads
     47                           the default is unset (synchronous uploads)
     48 * WAFCACHE_ASYNC_NOWAIT: do not wait for uploads to complete (default: False)
     49                          this requires asynchonous uploads to have an effect
     50 
     51 Usage::
     52 
     53 	def build(bld):
     54 		bld.load('wafcache')
     55 		...
     56 
     57 To troubleshoot::
     58 
     59 	waf clean build --zone=wafcache
     60 """
     61 
     62 import atexit, base64, errno, fcntl, getpass, os, re, shutil, sys, time, threading, traceback, urllib3, shlex
     63 try:
     64 	import subprocess32 as subprocess
     65 except ImportError:
     66 	import subprocess
     67 
     68 base_cache = os.path.expanduser('~/.cache/')
     69 if not os.path.isdir(base_cache):
     70 	base_cache = '/tmp/'
     71 default_wafcache_dir = os.path.join(base_cache, 'wafcache_' + getpass.getuser())
     72 
     73 CACHE_DIR = os.environ.get('WAFCACHE', default_wafcache_dir)
     74 WAFCACHE_CMD = os.environ.get('WAFCACHE_CMD')
     75 TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000))
     76 EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3))
     77 EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10))
     78 WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0
     79 WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0
     80 WAFCACHE_STATS = 1 if os.environ.get('WAFCACHE_STATS') else 0
     81 WAFCACHE_ASYNC_WORKERS = os.environ.get('WAFCACHE_ASYNC_WORKERS')
     82 WAFCACHE_ASYNC_NOWAIT = os.environ.get('WAFCACHE_ASYNC_NOWAIT')
     83 OK = "ok"
     84 
     85 re_waf_cmd = re.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})')
     86 
     87 try:
     88 	import cPickle
     89 except ImportError:
     90 	import pickle as cPickle
     91 
     92 if __name__ != '__main__':
     93 	from waflib import Task, Logs, Utils, Build
     94 
     95 def can_retrieve_cache(self):
     96 	"""
     97 	New method for waf Task classes
     98 	"""
     99 	if not self.outputs:
    100 		return False
    101 
    102 	self.cached = False
    103 
    104 	sig = self.signature()
    105 	ssig = Utils.to_hex(self.uid() + sig)
    106 
    107 	if WAFCACHE_STATS:
    108 		self.generator.bld.cache_reqs += 1
    109 
    110 	files_to = [node.abspath() for node in self.outputs]
    111 	proc = get_process()
    112 	err = cache_command(proc, ssig, [], files_to)
    113 	process_pool.append(proc)
    114 	if err.startswith(OK):
    115 		if WAFCACHE_VERBOSITY:
    116 			Logs.pprint('CYAN', '  Fetched %r from cache' % files_to)
    117 		else:
    118 			Logs.debug('wafcache: fetched %r from cache', files_to)
    119 		if WAFCACHE_STATS:
    120 			self.generator.bld.cache_hits += 1
    121 	else:
    122 		if WAFCACHE_VERBOSITY:
    123 			Logs.pprint('YELLOW', '  No cache entry %s' % files_to)
    124 		else:
    125 			Logs.debug('wafcache: No cache entry %s: %s', files_to, err)
    126 		return False
    127 
    128 	self.cached = True
    129 	return True
    130 
    131 def put_files_cache(self):
    132 	"""
    133 	New method for waf Task classes
    134 	"""
    135 	if WAFCACHE_NO_PUSH or getattr(self, 'cached', None) or not self.outputs:
    136 		return
    137 
    138 	files_from = []
    139 	for node in self.outputs:
    140 		path = node.abspath()
    141 		if not os.path.isfile(path):
    142 			return
    143 		files_from.append(path)
    144 
    145 	bld = self.generator.bld
    146 	old_sig = self.signature()
    147 
    148 	for node in self.inputs:
    149 		try:
    150 			del node.ctx.cache_sig[node]
    151 		except KeyError:
    152 			pass
    153 
    154 	delattr(self, 'cache_sig')
    155 	sig = self.signature()
    156 
    157 	def _async_put_files_cache(bld, ssig, files_from):
    158 		proc = get_process()
    159 		if WAFCACHE_ASYNC_WORKERS:
    160 			with bld.wafcache_lock:
    161 				if bld.wafcache_stop:
    162 					process_pool.append(proc)
    163 					return
    164 				bld.wafcache_procs.add(proc)
    165 
    166 		err = cache_command(proc, ssig, files_from, [])
    167 		process_pool.append(proc)
    168 		if err.startswith(OK):
    169 			if WAFCACHE_VERBOSITY:
    170 				Logs.pprint('CYAN', '  Successfully uploaded %s to cache' % files_from)
    171 			else:
    172 				Logs.debug('wafcache: Successfully uploaded %r to cache', files_from)
    173 			if WAFCACHE_STATS:
    174 				bld.cache_puts += 1
    175 		else:
    176 			if WAFCACHE_VERBOSITY:
    177 				Logs.pprint('RED', '  Error caching step results %s: %s' % (files_from, err))
    178 			else:
    179 				Logs.debug('wafcache: Error caching results %s: %s', files_from, err)
    180 
    181 	if old_sig == sig:
    182 		ssig = Utils.to_hex(self.uid() + sig)
    183 		if WAFCACHE_ASYNC_WORKERS:
    184 			fut = bld.wafcache_executor.submit(_async_put_files_cache, bld, ssig, files_from)
    185 			bld.wafcache_uploads.append(fut)
    186 		else:
    187 			_async_put_files_cache(bld, ssig, files_from)
    188 	else:
    189 		Logs.debug('wafcache: skipped %r upload due to late input modifications %r', self.outputs, self.inputs)
    190 
    191 	bld.task_sigs[self.uid()] = self.cache_sig
    192 
    193 def hash_env_vars(self, env, vars_lst):
    194 	"""
    195 	Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths
    196 	"""
    197 	if not env.table:
    198 		env = env.parent
    199 		if not env:
    200 			return Utils.SIG_NIL
    201 
    202 	idx = str(id(env)) + str(vars_lst)
    203 	try:
    204 		cache = self.cache_env
    205 	except AttributeError:
    206 		cache = self.cache_env = {}
    207 	else:
    208 		try:
    209 			return self.cache_env[idx]
    210 		except KeyError:
    211 			pass
    212 
    213 	v = str([env[a] for a in vars_lst])
    214 	v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
    215 	m = Utils.md5()
    216 	m.update(v.encode())
    217 	ret = m.digest()
    218 
    219 	Logs.debug('envhash: %r %r', ret, v)
    220 
    221 	cache[idx] = ret
    222 
    223 	return ret
    224 
    225 def uid(self):
    226 	"""
    227 	Reimplement Task.uid() so that the signature does not depend on local paths
    228 	"""
    229 	try:
    230 		return self.uid_
    231 	except AttributeError:
    232 		m = Utils.md5()
    233 		src = self.generator.bld.srcnode
    234 		up = m.update
    235 		up(self.__class__.__name__.encode())
    236 		for x in self.inputs + self.outputs:
    237 			up(x.path_from(src).encode())
    238 		self.uid_ = m.digest()
    239 		return self.uid_
    240 
    241 
    242 def make_cached(cls):
    243 	"""
    244 	Enable the waf cache for a given task class
    245 	"""
    246 	if getattr(cls, 'nocache', None) or getattr(cls, 'has_cache', False):
    247 		return
    248 
    249 	full_name = "%s.%s" % (cls.__module__, cls.__name__)
    250 	if full_name in ('waflib.Tools.ccroot.vnum', 'waflib.Build.inst'):
    251 		return
    252 
    253 	m1 = getattr(cls, 'run', None)
    254 	def run(self):
    255 		if getattr(self, 'nocache', False):
    256 			return m1(self)
    257 		if self.can_retrieve_cache():
    258 			return 0
    259 		return m1(self)
    260 	cls.run = run
    261 
    262 	m2 = getattr(cls, 'post_run', None)
    263 	def post_run(self):
    264 		if getattr(self, 'nocache', False):
    265 			return m2(self)
    266 		ret = m2(self)
    267 		self.put_files_cache()
    268 		return ret
    269 	cls.post_run = post_run
    270 	cls.has_cache = True
    271 
    272 process_pool = []
    273 def get_process():
    274 	"""
    275 	Returns a worker process that can process waf cache commands
    276 	The worker process is assumed to be returned to the process pool when unused
    277 	"""
    278 	try:
    279 		return process_pool.pop()
    280 	except IndexError:
    281 		filepath = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'wafcache.py'
    282 		cmd = [sys.executable, '-c', Utils.readf(filepath)]
    283 		return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0)
    284 
    285 def atexit_pool():
    286 	for proc in process_pool:
    287 		proc.kill()
    288 atexit.register(atexit_pool)
    289 
    290 def build(bld):
    291 	"""
    292 	Called during the build process to enable file caching
    293 	"""
    294 
    295 	if WAFCACHE_ASYNC_WORKERS:
    296 		try:
    297 			num_workers = int(WAFCACHE_ASYNC_WORKERS)
    298 		except ValueError:
    299 			Logs.warn('Invalid WAFCACHE_ASYNC_WORKERS specified: %r' % WAFCACHE_ASYNC_WORKERS)
    300 		else:
    301 			from concurrent.futures import ThreadPoolExecutor
    302 			bld.wafcache_executor = ThreadPoolExecutor(max_workers=num_workers)
    303 			bld.wafcache_uploads = []
    304 			bld.wafcache_procs = set([])
    305 			bld.wafcache_stop = False
    306 			bld.wafcache_lock = threading.Lock()
    307 
    308 		def finalize_upload_async(bld):
    309 			if WAFCACHE_ASYNC_NOWAIT:
    310 				with bld.wafcache_lock:
    311 					bld.wafcache_stop = True
    312 
    313 				for fut in reversed(bld.wafcache_uploads):
    314 					fut.cancel()
    315 
    316 				for proc in bld.wafcache_procs:
    317 					proc.kill()
    318 
    319 				bld.wafcache_procs.clear()
    320 			else:
    321 				Logs.pprint('CYAN', '... waiting for wafcache uploads to complete (%s uploads)' % len(bld.wafcache_uploads))
    322 			bld.wafcache_executor.shutdown(wait=True)
    323 		bld.add_post_fun(finalize_upload_async)
    324 
    325 	if WAFCACHE_STATS:
    326 		# Init counter for statistics and hook to print results at the end
    327 		bld.cache_reqs = bld.cache_hits = bld.cache_puts = 0
    328 
    329 		def printstats(bld):
    330 			hit_ratio = 0
    331 			if bld.cache_reqs > 0:
    332 				hit_ratio = (bld.cache_hits / bld.cache_reqs) * 100
    333 			Logs.pprint('CYAN', '  wafcache stats: %s requests, %s hits (ratio: %.2f%%), %s writes' %
    334 					 (bld.cache_reqs, bld.cache_hits, hit_ratio, bld.cache_puts) )
    335 		bld.add_post_fun(printstats)
    336 
    337 	if process_pool:
    338 		# already called once
    339 		return
    340 
    341 	# pre-allocation
    342 	processes = [get_process() for x in range(bld.jobs)]
    343 	process_pool.extend(processes)
    344 
    345 	Task.Task.can_retrieve_cache = can_retrieve_cache
    346 	Task.Task.put_files_cache = put_files_cache
    347 	Task.Task.uid = uid
    348 	Build.BuildContext.hash_env_vars = hash_env_vars
    349 	for x in reversed(list(Task.classes.values())):
    350 		make_cached(x)
    351 
    352 def cache_command(proc, sig, files_from, files_to):
    353 	"""
    354 	Create a command for cache worker processes, returns a pickled
    355 	base64-encoded tuple containing the task signature, a list of files to
    356 	cache and a list of files files to get from cache (one of the lists
    357 	is assumed to be empty)
    358 	"""
    359 	obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to]))
    360 	proc.stdin.write(obj)
    361 	proc.stdin.write('\n'.encode())
    362 	proc.stdin.flush()
    363 	obj = proc.stdout.readline()
    364 	if not obj:
    365 		raise OSError('Preforked sub-process %r died' % proc.pid)
    366 	return cPickle.loads(base64.b64decode(obj))
    367 
    368 try:
    369 	copyfun = os.link
    370 except NameError:
    371 	copyfun = shutil.copy2
    372 
    373 def atomic_copy(orig, dest):
    374 	"""
    375 	Copy files to the cache, the operation is atomic for a given file
    376 	"""
    377 	global copyfun
    378 	tmp = dest + '.tmp'
    379 	up = os.path.dirname(dest)
    380 	try:
    381 		os.makedirs(up)
    382 	except OSError:
    383 		pass
    384 
    385 	try:
    386 		copyfun(orig, tmp)
    387 	except OSError as e:
    388 		if e.errno == errno.EXDEV:
    389 			copyfun = shutil.copy2
    390 			copyfun(orig, tmp)
    391 		else:
    392 			raise
    393 	os.rename(tmp, dest)
    394 
    395 def lru_trim():
    396 	"""
    397 	the cache folders take the form:
    398 	`CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9`
    399 	they are listed in order of last access, and then removed
    400 	until the amount of folders is within TRIM_MAX_FOLDERS and the total space
    401 	taken by files is less than EVICT_MAX_BYTES
    402 	"""
    403 	lst = []
    404 	for up in os.listdir(CACHE_DIR):
    405 		if len(up) == 2:
    406 			sub = os.path.join(CACHE_DIR, up)
    407 			for hval in os.listdir(sub):
    408 				path = os.path.join(sub, hval)
    409 
    410 				size = 0
    411 				for fname in os.listdir(path):
    412 					try:
    413 						size += os.lstat(os.path.join(path, fname)).st_size
    414 					except OSError:
    415 						pass
    416 				lst.append((os.stat(path).st_mtime, size, path))
    417 
    418 	lst.sort(key=lambda x: x[0])
    419 	lst.reverse()
    420 
    421 	tot = sum(x[1] for x in lst)
    422 	while tot > EVICT_MAX_BYTES or len(lst) > TRIM_MAX_FOLDERS:
    423 		_, tmp_size, path = lst.pop()
    424 		tot -= tmp_size
    425 
    426 		tmp = path + '.remove'
    427 		try:
    428 			shutil.rmtree(tmp)
    429 		except OSError:
    430 			pass
    431 		try:
    432 			os.rename(path, tmp)
    433 		except OSError:
    434 			sys.stderr.write('Could not rename %r to %r\n' % (path, tmp))
    435 		else:
    436 			try:
    437 				shutil.rmtree(tmp)
    438 			except OSError:
    439 				sys.stderr.write('Could not remove %r\n' % tmp)
    440 	sys.stderr.write("Cache trimmed: %r bytes in %r folders left\n" % (tot, len(lst)))
    441 
    442 
    443 def lru_evict():
    444 	"""
    445 	Reduce the cache size
    446 	"""
    447 	lockfile = os.path.join(CACHE_DIR, 'all.lock')
    448 	try:
    449 		st = os.stat(lockfile)
    450 	except EnvironmentError as e:
    451 		if e.errno == errno.ENOENT:
    452 			with open(lockfile, 'w') as f:
    453 				f.write('')
    454 			return
    455 		else:
    456 			raise
    457 
    458 	if st.st_mtime < time.time() - EVICT_INTERVAL_MINUTES * 60:
    459 		# check every EVICT_INTERVAL_MINUTES minutes if the cache is too big
    460 		# OCLOEXEC is unnecessary because no processes are spawned
    461 		fd = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o755)
    462 		try:
    463 			try:
    464 				fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
    465 			except EnvironmentError:
    466 				if WAFCACHE_VERBOSITY:
    467 					sys.stderr.write('wafcache: another cleaning process is running\n')
    468 			else:
    469 				# now dow the actual cleanup
    470 				lru_trim()
    471 				os.utime(lockfile, None)
    472 		finally:
    473 			os.close(fd)
    474 
    475 class netcache(object):
    476 	def __init__(self):
    477 		self.http = urllib3.PoolManager()
    478 
    479 	def url_of(self, sig, i):
    480 		return "%s/%s/%s" % (CACHE_DIR, sig, i)
    481 
    482 	def upload(self, file_path, sig, i):
    483 		url = self.url_of(sig, i)
    484 		with open(file_path, 'rb') as f:
    485 			file_data = f.read()
    486 		r = self.http.request('POST', url, timeout=60,
    487 			fields={ 'file': ('%s/%s' % (sig, i), file_data), })
    488 		if r.status >= 400:
    489 			raise OSError("Invalid status %r %r" % (url, r.status))
    490 
    491 	def download(self, file_path, sig, i):
    492 		url = self.url_of(sig, i)
    493 		with self.http.request('GET', url, preload_content=False, timeout=60) as inf:
    494 			if inf.status >= 400:
    495 				raise OSError("Invalid status %r %r" % (url, inf.status))
    496 			with open(file_path, 'wb') as out:
    497 				shutil.copyfileobj(inf, out)
    498 
    499 	def copy_to_cache(self, sig, files_from, files_to):
    500 		try:
    501 			for i, x in enumerate(files_from):
    502 				if not os.path.islink(x):
    503 					self.upload(x, sig, i)
    504 		except Exception:
    505 			return traceback.format_exc()
    506 		return OK
    507 
    508 	def copy_from_cache(self, sig, files_from, files_to):
    509 		try:
    510 			for i, x in enumerate(files_to):
    511 				self.download(x, sig, i)
    512 		except Exception:
    513 			return traceback.format_exc()
    514 		return OK
    515 
    516 class fcache(object):
    517 	def __init__(self):
    518 		if not os.path.exists(CACHE_DIR):
    519 			try:
    520 				os.makedirs(CACHE_DIR)
    521 			except OSError:
    522 				pass
    523 		if not os.path.exists(CACHE_DIR):
    524 			raise ValueError('Could not initialize the cache directory')
    525 
    526 	def copy_to_cache(self, sig, files_from, files_to):
    527 		"""
    528 		Copy files to the cache, existing files are overwritten,
    529 		and the copy is atomic only for a given file, not for all files
    530 		that belong to a given task object
    531 		"""
    532 		try:
    533 			for i, x in enumerate(files_from):
    534 				dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
    535 				atomic_copy(x, dest)
    536 		except Exception:
    537 			return traceback.format_exc()
    538 		else:
    539 			# attempt trimming if caching was successful:
    540 			# we may have things to trim!
    541 			try:
    542 				lru_evict()
    543 			except Exception:
    544 				return traceback.format_exc()
    545 		return OK
    546 
    547 	def copy_from_cache(self, sig, files_from, files_to):
    548 		"""
    549 		Copy files from the cache
    550 		"""
    551 		try:
    552 			for i, x in enumerate(files_to):
    553 				orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
    554 				atomic_copy(orig, x)
    555 
    556 			# success! update the cache time
    557 			os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None)
    558 		except Exception:
    559 			return traceback.format_exc()
    560 		return OK
    561 
    562 class bucket_cache(object):
    563 	def bucket_copy(self, source, target):
    564 		if WAFCACHE_CMD:
    565 			def replacer(match):
    566 				if match.group('src'):
    567 					return source
    568 				elif match.group('tgt'):
    569 					return target
    570 			cmd = [re_waf_cmd.sub(replacer, x) for x in shlex.split(WAFCACHE_CMD)]
    571 		elif CACHE_DIR.startswith('s3://'):
    572 			cmd = ['aws', 's3', 'cp', source, target]
    573 		elif CACHE_DIR.startswith('gs://'):
    574 			cmd = ['gsutil', 'cp', source, target]
    575 		else:
    576 			cmd = ['mc', 'cp', source, target]
    577 
    578 		proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    579 		out, err = proc.communicate()
    580 		if proc.returncode:
    581 			raise OSError('Error copy %r to %r using: %r (exit %r):\n  out:%s\n  err:%s' % (
    582 				source, target, cmd, proc.returncode, out.decode(errors='replace'), err.decode(errors='replace')))
    583 
    584 	def copy_to_cache(self, sig, files_from, files_to):
    585 		try:
    586 			for i, x in enumerate(files_from):
    587 				dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
    588 				self.bucket_copy(x, dest)
    589 		except Exception:
    590 			return traceback.format_exc()
    591 		return OK
    592 
    593 	def copy_from_cache(self, sig, files_from, files_to):
    594 		try:
    595 			for i, x in enumerate(files_to):
    596 				orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
    597 				self.bucket_copy(orig, x)
    598 		except EnvironmentError:
    599 			return traceback.format_exc()
    600 		return OK
    601 
    602 def loop(service):
    603 	"""
    604 	This function is run when this file is run as a standalone python script,
    605 	it assumes a parent process that will communicate the commands to it
    606 	as pickled-encoded tuples (one line per command)
    607 
    608 	The commands are to copy files to the cache or copy files from the
    609 	cache to a target destination
    610 	"""
    611 	# one operation is performed at a single time by a single process
    612 	# therefore stdin never has more than one line
    613 	txt = sys.stdin.readline().strip()
    614 	if not txt:
    615 		# parent process probably ended
    616 		sys.exit(1)
    617 	ret = OK
    618 
    619 	[sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt))
    620 	if files_from:
    621 		# TODO return early when pushing files upstream
    622 		ret = service.copy_to_cache(sig, files_from, files_to)
    623 	elif files_to:
    624 		# the build process waits for workers to (possibly) obtain files from the cache
    625 		ret = service.copy_from_cache(sig, files_from, files_to)
    626 	else:
    627 		ret = "Invalid command"
    628 
    629 	obj = base64.b64encode(cPickle.dumps(ret))
    630 	sys.stdout.write(obj.decode())
    631 	sys.stdout.write('\n')
    632 	sys.stdout.flush()
    633 
    634 if __name__ == '__main__':
    635 	if CACHE_DIR.startswith('s3://') or CACHE_DIR.startswith('gs://') or CACHE_DIR.startswith('minio://'):
    636 		if CACHE_DIR.startswith('minio://'):
    637 			CACHE_DIR = CACHE_DIR[8:]   # minio doesn't need the protocol part, uses config aliases
    638 		service = bucket_cache()
    639 	elif CACHE_DIR.startswith('http'):
    640 		service = netcache()
    641 	else:
    642 		service = fcache()
    643 	while 1:
    644 		try:
    645 			loop(service)
    646 		except KeyboardInterrupt:
    647 			break
    648