You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
waf/waflib/extras/netcache_client.py

391 lines
8.9 KiB
Python

#! /usr/bin/env python
# encoding: utf-8
# Thomas Nagy, 2011-2015 (ita)
"""
A client for the network cache (playground/netcache/). Launch the server with:
./netcache_server, then use it for the builds by adding the following:
def build(bld):
bld.load('netcache_client')
The parameters should be present in the environment in the form:
NETCACHE=host:port waf configure build
Or in a more detailed way:
NETCACHE_PUSH=host:port NETCACHE_PULL=host:port waf configure build
where:
host: host where the server resides, by default localhost
port: by default push on 11001 and pull on 12001
Use the server provided in playground/netcache/Netcache.java
"""
import os, socket, time, atexit, sys
from waflib import Task, Logs, Utils, Build, Runner
from waflib.Configure import conf
BUF = 8192 * 16
HEADER_SIZE = 128
MODES = ['PUSH', 'PULL', 'PUSH_PULL']
STALE_TIME = 30 # seconds
GET = 'GET'
PUT = 'PUT'
LST = 'LST'
BYE = 'BYE'
all_sigs_in_cache = (0.0, [])
def put_data(conn, data):
if sys.hexversion > 0x3000000:
data = data.encode('latin-1')
cnt = 0
while cnt < len(data):
sent = conn.send(data[cnt:])
if sent == 0:
raise RuntimeError('connection ended')
cnt += sent
push_connections = Runner.Queue(0)
pull_connections = Runner.Queue(0)
def get_connection(push=False):
# return a new connection... do not forget to release it!
try:
if push:
ret = push_connections.get(block=False)
else:
ret = pull_connections.get(block=False)
except Exception:
ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if push:
ret.connect(Task.push_addr)
else:
ret.connect(Task.pull_addr)
return ret
def release_connection(conn, msg='', push=False):
if conn:
if push:
push_connections.put(conn)
else:
pull_connections.put(conn)
def close_connection(conn, msg=''):
if conn:
data = '%s,%s' % (BYE, msg)
try:
put_data(conn, data.ljust(HEADER_SIZE))
except:
pass
try:
conn.close()
except:
pass
def close_all():
for q in (push_connections, pull_connections):
while q.qsize():
conn = q.get()
try:
close_connection(conn)
except:
# ignore errors when cleaning up
pass
atexit.register(close_all)
def read_header(conn):
cnt = 0
buf = []
while cnt < HEADER_SIZE:
data = conn.recv(HEADER_SIZE - cnt)
if not data:
#import traceback
#traceback.print_stack()
raise ValueError('connection ended when reading a header %r' % buf)
buf.append(data)
cnt += len(data)
if sys.hexversion > 0x3000000:
ret = ''.encode('latin-1').join(buf)
ret = ret.decode('latin-1')
else:
ret = ''.join(buf)
return ret
def check_cache(conn, ssig):
"""
List the files on the server, this is an optimization because it assumes that
concurrent builds are rare
"""
global all_sigs_in_cache
if not STALE_TIME:
return
if time.time() - all_sigs_in_cache[0] > STALE_TIME:
params = (LST,'')
put_data(conn, ','.join(params).ljust(HEADER_SIZE))
# read what is coming back
ret = read_header(conn)
size = int(ret.split(',')[0])
buf = []
cnt = 0
while cnt < size:
data = conn.recv(min(BUF, size-cnt))
if not data:
raise ValueError('connection ended %r %r' % (cnt, size))
buf.append(data)
cnt += len(data)
if sys.hexversion > 0x3000000:
ret = ''.encode('latin-1').join(buf)
ret = ret.decode('latin-1')
else:
ret = ''.join(buf)
all_sigs_in_cache = (time.time(), ret.splitlines())
Logs.debug('netcache: server cache has %r entries', len(all_sigs_in_cache[1]))
if not ssig in all_sigs_in_cache[1]:
raise ValueError('no file %s in cache' % ssig)
class MissingFile(Exception):
pass
def recv_file(conn, ssig, count, p):
check_cache(conn, ssig)
params = (GET, ssig, str(count))
put_data(conn, ','.join(params).ljust(HEADER_SIZE))
data = read_header(conn)
size = int(data.split(',')[0])
if size == -1:
raise MissingFile('no file %s - %s in cache' % (ssig, count))
# get the file, writing immediately
# TODO a tmp file would be better
f = open(p, 'wb')
cnt = 0
while cnt < size:
data = conn.recv(min(BUF, size-cnt))
if not data:
raise ValueError('connection ended %r %r' % (cnt, size))
f.write(data)
cnt += len(data)
f.close()
def sock_send(conn, ssig, cnt, p):
#print "pushing %r %r %r" % (ssig, cnt, p)
size = os.stat(p).st_size
params = (PUT, ssig, str(cnt), str(size))
put_data(conn, ','.join(params).ljust(HEADER_SIZE))
f = open(p, 'rb')
cnt = 0
while cnt < size:
r = f.read(min(BUF, size-cnt))
while r:
k = conn.send(r)
if not k:
raise ValueError('connection ended')
cnt += k
r = r[k:]
def can_retrieve_cache(self):
if not Task.pull_addr:
return False
if not self.outputs:
return False
self.cached = False
cnt = 0
sig = self.signature()
ssig = Utils.to_hex(self.uid() + sig)
conn = None
err = False
try:
try:
conn = get_connection()
for node in self.outputs:
p = node.abspath()
recv_file(conn, ssig, cnt, p)
cnt += 1
except MissingFile as e:
Logs.debug('netcache: file is not in the cache %r', e)
err = True
except Exception as e:
Logs.debug('netcache: could not get the files %r', self.outputs)
if Logs.verbose > 1:
Logs.debug('netcache: exception %r', e)
err = True
# broken connection? remove this one
close_connection(conn)
conn = None
else:
Logs.debug('netcache: obtained %r from cache', self.outputs)
finally:
release_connection(conn)
if err:
return False
self.cached = True
return True
@Utils.run_once
def put_files_cache(self):
if not Task.push_addr:
return
if not self.outputs:
return
if getattr(self, 'cached', None):
return
#print "called put_files_cache", id(self)
bld = self.generator.bld
sig = self.signature()
ssig = Utils.to_hex(self.uid() + sig)
conn = None
cnt = 0
try:
for node in self.outputs:
# We could re-create the signature of the task with the signature of the outputs
# in practice, this means hashing the output files
# this is unnecessary
try:
if not conn:
conn = get_connection(push=True)
sock_send(conn, ssig, cnt, node.abspath())
Logs.debug('netcache: sent %r', node)
except Exception as e:
Logs.debug('netcache: could not push the files %r', e)
# broken connection? remove this one
close_connection(conn)
conn = None
cnt += 1
finally:
release_connection(conn, push=True)
bld.task_sigs[self.uid()] = self.cache_sig
def hash_env_vars(self, env, vars_lst):
# reimplement so that the resulting hash does not depend on local paths
if not env.table:
env = env.parent
if not env:
return Utils.SIG_NIL
idx = str(id(env)) + str(vars_lst)
try:
cache = self.cache_env
except AttributeError:
cache = self.cache_env = {}
else:
try:
return self.cache_env[idx]
except KeyError:
pass
v = str([env[a] for a in vars_lst])
v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
m = Utils.md5()
m.update(v.encode())
ret = m.digest()
Logs.debug('envhash: %r %r', ret, v)
cache[idx] = ret
return ret
def uid(self):
# reimplement so that the signature does not depend on local paths
try:
return self.uid_
except AttributeError:
m = Utils.md5()
src = self.generator.bld.srcnode
up = m.update
up(self.__class__.__name__.encode())
for x in self.inputs + self.outputs:
up(x.path_from(src).encode())
self.uid_ = m.digest()
return self.uid_
def make_cached(cls):
if getattr(cls, 'nocache', None):
return
m1 = cls.run
def run(self):
if getattr(self, 'nocache', False):
return m1(self)
if self.can_retrieve_cache():
return 0
return m1(self)
cls.run = run
m2 = cls.post_run
def post_run(self):
if getattr(self, 'nocache', False):
return m2(self)
bld = self.generator.bld
ret = m2(self)
if bld.cache_global:
self.put_files_cache()
if hasattr(self, 'chmod'):
for node in self.outputs:
os.chmod(node.abspath(), self.chmod)
return ret
cls.post_run = post_run
@conf
def setup_netcache(ctx, push_addr, pull_addr):
Task.Task.can_retrieve_cache = can_retrieve_cache
Task.Task.put_files_cache = put_files_cache
Task.Task.uid = uid
Task.push_addr = push_addr
Task.pull_addr = pull_addr
Build.BuildContext.hash_env_vars = hash_env_vars
ctx.cache_global = True
for x in Task.classes.values():
make_cached(x)
def build(bld):
if not 'NETCACHE' in os.environ and not 'NETCACHE_PULL' in os.environ and not 'NETCACHE_PUSH' in os.environ:
Logs.warn('Setting NETCACHE_PULL=127.0.0.1:11001 and NETCACHE_PUSH=127.0.0.1:12001')
os.environ['NETCACHE_PULL'] = '127.0.0.1:12001'
os.environ['NETCACHE_PUSH'] = '127.0.0.1:11001'
if 'NETCACHE' in os.environ:
if not 'NETCACHE_PUSH' in os.environ:
os.environ['NETCACHE_PUSH'] = os.environ['NETCACHE']
if not 'NETCACHE_PULL' in os.environ:
os.environ['NETCACHE_PULL'] = os.environ['NETCACHE']
v = os.environ['NETCACHE_PULL']
if v:
h, p = v.split(':')
pull_addr = (h, int(p))
else:
pull_addr = None
v = os.environ['NETCACHE_PUSH']
if v:
h, p = v.split(':')
push_addr = (h, int(p))
else:
push_addr = None
setup_netcache(bld, push_addr, pull_addr)