waf

FORK: waf with some random patches
git clone https://git.neptards.moe/neptards/waf.git
Log | Files | Refs | README

netcache_client.py (9095B)


      1 #! /usr/bin/env python
      2 # encoding: utf-8
      3 # Thomas Nagy, 2011-2015 (ita)
      4 
      5 """
      6 A client for the network cache (playground/netcache/). Launch the server with:
      7 ./netcache_server, then use it for the builds by adding the following:
      8 
      9 	def build(bld):
     10 		bld.load('netcache_client')
     11 
     12 The parameters should be present in the environment in the form:
     13 	NETCACHE=host:port waf configure build
     14 
     15 Or in a more detailed way:
     16 	NETCACHE_PUSH=host:port NETCACHE_PULL=host:port waf configure build
     17 
     18 where:
     19 	host: host where the server resides, by default localhost
     20 	port: by default push on 11001 and pull on 12001
     21 
     22 Use the server provided in playground/netcache/Netcache.java
     23 """
     24 
     25 import os, socket, time, atexit, sys
     26 from waflib import Task, Logs, Utils, Build, Runner
     27 from waflib.Configure import conf
     28 
     29 BUF = 8192 * 16
     30 HEADER_SIZE = 128
     31 MODES = ['PUSH', 'PULL', 'PUSH_PULL']
     32 STALE_TIME = 30 # seconds
     33 
     34 GET = 'GET'
     35 PUT = 'PUT'
     36 LST = 'LST'
     37 BYE = 'BYE'
     38 
     39 all_sigs_in_cache = (0.0, [])
     40 
     41 def put_data(conn, data):
     42 	if sys.hexversion > 0x3000000:
     43 		data = data.encode('latin-1')
     44 	cnt = 0
     45 	while cnt < len(data):
     46 		sent = conn.send(data[cnt:])
     47 		if sent == 0:
     48 			raise RuntimeError('connection ended')
     49 		cnt += sent
     50 
     51 push_connections = Runner.Queue(0)
     52 pull_connections = Runner.Queue(0)
     53 def get_connection(push=False):
     54 	# return a new connection... do not forget to release it!
     55 	try:
     56 		if push:
     57 			ret = push_connections.get(block=False)
     58 		else:
     59 			ret = pull_connections.get(block=False)
     60 	except Exception:
     61 		ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     62 		if push:
     63 			ret.connect(Task.push_addr)
     64 		else:
     65 			ret.connect(Task.pull_addr)
     66 	return ret
     67 
     68 def release_connection(conn, msg='', push=False):
     69 	if conn:
     70 		if push:
     71 			push_connections.put(conn)
     72 		else:
     73 			pull_connections.put(conn)
     74 
     75 def close_connection(conn, msg=''):
     76 	if conn:
     77 		data = '%s,%s' % (BYE, msg)
     78 		try:
     79 			put_data(conn, data.ljust(HEADER_SIZE))
     80 		except:
     81 			pass
     82 		try:
     83 			conn.close()
     84 		except:
     85 			pass
     86 
     87 def close_all():
     88 	for q in (push_connections, pull_connections):
     89 		while q.qsize():
     90 			conn = q.get()
     91 			try:
     92 				close_connection(conn)
     93 			except:
     94 				# ignore errors when cleaning up
     95 				pass
     96 atexit.register(close_all)
     97 
     98 def read_header(conn):
     99 	cnt = 0
    100 	buf = []
    101 	while cnt < HEADER_SIZE:
    102 		data = conn.recv(HEADER_SIZE - cnt)
    103 		if not data:
    104 			#import traceback
    105 			#traceback.print_stack()
    106 			raise ValueError('connection ended when reading a header %r' % buf)
    107 		buf.append(data)
    108 		cnt += len(data)
    109 	if sys.hexversion > 0x3000000:
    110 		ret = ''.encode('latin-1').join(buf)
    111 		ret = ret.decode('latin-1')
    112 	else:
    113 		ret = ''.join(buf)
    114 	return ret
    115 
    116 def check_cache(conn, ssig):
    117 	"""
    118 	List the files on the server, this is an optimization because it assumes that
    119 	concurrent builds are rare
    120 	"""
    121 	global all_sigs_in_cache
    122 	if not STALE_TIME:
    123 		return
    124 	if time.time() - all_sigs_in_cache[0] > STALE_TIME:
    125 
    126 		params = (LST,'')
    127 		put_data(conn, ','.join(params).ljust(HEADER_SIZE))
    128 
    129 		# read what is coming back
    130 		ret = read_header(conn)
    131 		size = int(ret.split(',')[0])
    132 
    133 		buf = []
    134 		cnt = 0
    135 		while cnt < size:
    136 			data = conn.recv(min(BUF, size-cnt))
    137 			if not data:
    138 				raise ValueError('connection ended %r %r' % (cnt, size))
    139 			buf.append(data)
    140 			cnt += len(data)
    141 
    142 		if sys.hexversion > 0x3000000:
    143 			ret = ''.encode('latin-1').join(buf)
    144 			ret = ret.decode('latin-1')
    145 		else:
    146 			ret = ''.join(buf)
    147 
    148 		all_sigs_in_cache = (time.time(), ret.splitlines())
    149 		Logs.debug('netcache: server cache has %r entries', len(all_sigs_in_cache[1]))
    150 
    151 	if not ssig in all_sigs_in_cache[1]:
    152 		raise ValueError('no file %s in cache' % ssig)
    153 
    154 class MissingFile(Exception):
    155 	pass
    156 
    157 def recv_file(conn, ssig, count, p):
    158 	check_cache(conn, ssig)
    159 
    160 	params = (GET, ssig, str(count))
    161 	put_data(conn, ','.join(params).ljust(HEADER_SIZE))
    162 	data = read_header(conn)
    163 
    164 	size = int(data.split(',')[0])
    165 
    166 	if size == -1:
    167 		raise MissingFile('no file %s - %s in cache' % (ssig, count))
    168 
    169 	# get the file, writing immediately
    170 	# TODO a tmp file would be better
    171 	f = open(p, 'wb')
    172 	cnt = 0
    173 	while cnt < size:
    174 		data = conn.recv(min(BUF, size-cnt))
    175 		if not data:
    176 			raise ValueError('connection ended %r %r' % (cnt, size))
    177 		f.write(data)
    178 		cnt += len(data)
    179 	f.close()
    180 
    181 def sock_send(conn, ssig, cnt, p):
    182 	#print "pushing %r %r %r" % (ssig, cnt, p)
    183 	size = os.stat(p).st_size
    184 	params = (PUT, ssig, str(cnt), str(size))
    185 	put_data(conn, ','.join(params).ljust(HEADER_SIZE))
    186 	f = open(p, 'rb')
    187 	cnt = 0
    188 	while cnt < size:
    189 		r = f.read(min(BUF, size-cnt))
    190 		while r:
    191 			k = conn.send(r)
    192 			if not k:
    193 				raise ValueError('connection ended')
    194 			cnt += k
    195 			r = r[k:]
    196 
    197 def can_retrieve_cache(self):
    198 	if not Task.pull_addr:
    199 		return False
    200 	if not self.outputs:
    201 		return False
    202 	self.cached = False
    203 
    204 	cnt = 0
    205 	sig = self.signature()
    206 	ssig = Utils.to_hex(self.uid() + sig)
    207 
    208 	conn = None
    209 	err = False
    210 	try:
    211 		try:
    212 			conn = get_connection()
    213 			for node in self.outputs:
    214 				p = node.abspath()
    215 				recv_file(conn, ssig, cnt, p)
    216 				cnt += 1
    217 		except MissingFile as e:
    218 			Logs.debug('netcache: file is not in the cache %r', e)
    219 			err = True
    220 		except Exception as e:
    221 			Logs.debug('netcache: could not get the files %r', self.outputs)
    222 			if Logs.verbose > 1:
    223 				Logs.debug('netcache: exception %r', e)
    224 			err = True
    225 
    226 			# broken connection? remove this one
    227 			close_connection(conn)
    228 			conn = None
    229 		else:
    230 			Logs.debug('netcache: obtained %r from cache', self.outputs)
    231 
    232 	finally:
    233 		release_connection(conn)
    234 	if err:
    235 		return False
    236 
    237 	self.cached = True
    238 	return True
    239 
    240 @Utils.run_once
    241 def put_files_cache(self):
    242 	if not Task.push_addr:
    243 		return
    244 	if not self.outputs:
    245 		return
    246 	if getattr(self, 'cached', None):
    247 		return
    248 
    249 	#print "called put_files_cache", id(self)
    250 	bld = self.generator.bld
    251 	sig = self.signature()
    252 	ssig = Utils.to_hex(self.uid() + sig)
    253 
    254 	conn = None
    255 	cnt = 0
    256 	try:
    257 		for node in self.outputs:
    258 			# We could re-create the signature of the task with the signature of the outputs
    259 			# in practice, this means hashing the output files
    260 			# this is unnecessary
    261 			try:
    262 				if not conn:
    263 					conn = get_connection(push=True)
    264 				sock_send(conn, ssig, cnt, node.abspath())
    265 				Logs.debug('netcache: sent %r', node)
    266 			except Exception as e:
    267 				Logs.debug('netcache: could not push the files %r', e)
    268 
    269 				# broken connection? remove this one
    270 				close_connection(conn)
    271 				conn = None
    272 			cnt += 1
    273 	finally:
    274 		release_connection(conn, push=True)
    275 
    276 	bld.task_sigs[self.uid()] = self.cache_sig
    277 
    278 def hash_env_vars(self, env, vars_lst):
    279 	# reimplement so that the resulting hash does not depend on local paths
    280 	if not env.table:
    281 		env = env.parent
    282 		if not env:
    283 			return Utils.SIG_NIL
    284 
    285 	idx = str(id(env)) + str(vars_lst)
    286 	try:
    287 		cache = self.cache_env
    288 	except AttributeError:
    289 		cache = self.cache_env = {}
    290 	else:
    291 		try:
    292 			return self.cache_env[idx]
    293 		except KeyError:
    294 			pass
    295 
    296 	v = str([env[a] for a in vars_lst])
    297 	v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
    298 	m = Utils.md5()
    299 	m.update(v.encode())
    300 	ret = m.digest()
    301 
    302 	Logs.debug('envhash: %r %r', ret, v)
    303 
    304 	cache[idx] = ret
    305 
    306 	return ret
    307 
    308 def uid(self):
    309 	# reimplement so that the signature does not depend on local paths
    310 	try:
    311 		return self.uid_
    312 	except AttributeError:
    313 		m = Utils.md5()
    314 		src = self.generator.bld.srcnode
    315 		up = m.update
    316 		up(self.__class__.__name__.encode())
    317 		for x in self.inputs + self.outputs:
    318 			up(x.path_from(src).encode())
    319 		self.uid_ = m.digest()
    320 		return self.uid_
    321 
    322 
    323 def make_cached(cls):
    324 	if getattr(cls, 'nocache', None):
    325 		return
    326 
    327 	m1 = cls.run
    328 	def run(self):
    329 		if getattr(self, 'nocache', False):
    330 			return m1(self)
    331 		if self.can_retrieve_cache():
    332 			return 0
    333 		return m1(self)
    334 	cls.run = run
    335 
    336 	m2 = cls.post_run
    337 	def post_run(self):
    338 		if getattr(self, 'nocache', False):
    339 			return m2(self)
    340 		bld = self.generator.bld
    341 		ret = m2(self)
    342 		if bld.cache_global:
    343 			self.put_files_cache()
    344 		if hasattr(self, 'chmod'):
    345 			for node in self.outputs:
    346 				os.chmod(node.abspath(), self.chmod)
    347 		return ret
    348 	cls.post_run = post_run
    349 
    350 @conf
    351 def setup_netcache(ctx, push_addr, pull_addr):
    352 	Task.Task.can_retrieve_cache = can_retrieve_cache
    353 	Task.Task.put_files_cache = put_files_cache
    354 	Task.Task.uid = uid
    355 	Task.push_addr = push_addr
    356 	Task.pull_addr = pull_addr
    357 	Build.BuildContext.hash_env_vars = hash_env_vars
    358 	ctx.cache_global = True
    359 
    360 	for x in Task.classes.values():
    361 		make_cached(x)
    362 
    363 def build(bld):
    364 	if not 'NETCACHE' in os.environ and not 'NETCACHE_PULL' in os.environ and not 'NETCACHE_PUSH' in os.environ:
    365 		Logs.warn('Setting  NETCACHE_PULL=127.0.0.1:11001 and NETCACHE_PUSH=127.0.0.1:12001')
    366 		os.environ['NETCACHE_PULL'] = '127.0.0.1:12001'
    367 		os.environ['NETCACHE_PUSH'] = '127.0.0.1:11001'
    368 
    369 	if 'NETCACHE' in os.environ:
    370 		if not 'NETCACHE_PUSH' in os.environ:
    371 			os.environ['NETCACHE_PUSH'] = os.environ['NETCACHE']
    372 		if not 'NETCACHE_PULL' in os.environ:
    373 			os.environ['NETCACHE_PULL'] = os.environ['NETCACHE']
    374 
    375 	v = os.environ['NETCACHE_PULL']
    376 	if v:
    377 		h, p = v.split(':')
    378 		pull_addr = (h, int(p))
    379 	else:
    380 		pull_addr = None
    381 
    382 	v = os.environ['NETCACHE_PUSH']
    383 	if v:
    384 		h, p = v.split(':')
    385 		push_addr = (h, int(p))
    386 	else:
    387 		push_addr = None
    388 
    389 	setup_netcache(bld, push_addr, pull_addr)
    390