waf

FORK: waf with some random patches
git clone https://git.neptards.moe/neptards/waf.git
Log | Files | Refs | README

Runner.py (16394B)


      1 #!/usr/bin/env python
      2 # encoding: utf-8
      3 # Thomas Nagy, 2005-2018 (ita)
      4 
      5 """
      6 Runner.py: Task scheduling and execution
      7 """
      8 
      9 import heapq, traceback
     10 try:
     11 	from queue import Queue, PriorityQueue
     12 except ImportError:
     13 	from Queue import Queue
     14 	try:
     15 		from Queue import PriorityQueue
     16 	except ImportError:
     17 		class PriorityQueue(Queue):
     18 			def _init(self, maxsize):
     19 				self.maxsize = maxsize
     20 				self.queue = []
     21 			def _put(self, item):
     22 				heapq.heappush(self.queue, item)
     23 			def _get(self):
     24 				return heapq.heappop(self.queue)
     25 
     26 from waflib import Utils, Task, Errors, Logs
     27 
     28 GAP = 5
     29 """
     30 Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run
     31 """
     32 
     33 class PriorityTasks(object):
     34 	def __init__(self):
     35 		self.lst = []
     36 	def __len__(self):
     37 		return len(self.lst)
     38 	def __iter__(self):
     39 		return iter(self.lst)
     40 	def __str__(self):
     41 		return 'PriorityTasks: [%s]' % '\n  '.join(str(x) for x in self.lst)
     42 	def clear(self):
     43 		self.lst = []
     44 	def append(self, task):
     45 		heapq.heappush(self.lst, task)
     46 	def appendleft(self, task):
     47 		"Deprecated, do not use"
     48 		heapq.heappush(self.lst, task)
     49 	def pop(self):
     50 		return heapq.heappop(self.lst)
     51 	def extend(self, lst):
     52 		if self.lst:
     53 			for x in lst:
     54 				self.append(x)
     55 		else:
     56 			if isinstance(lst, list):
     57 				self.lst = lst
     58 				heapq.heapify(lst)
     59 			else:
     60 				self.lst = lst.lst
     61 
     62 class Consumer(Utils.threading.Thread):
     63 	"""
     64 	Daemon thread object that executes a task. It shares a semaphore with
     65 	the coordinator :py:class:`waflib.Runner.Spawner`. There is one
     66 	instance per task to consume.
     67 	"""
     68 	def __init__(self, spawner, task):
     69 		Utils.threading.Thread.__init__(self)
     70 		self.task = task
     71 		"""Task to execute"""
     72 		self.spawner = spawner
     73 		"""Coordinator object"""
     74 		self.daemon = True
     75 		self.start()
     76 	def run(self):
     77 		"""
     78 		Processes a single task
     79 		"""
     80 		try:
     81 			if not self.spawner.master.stop:
     82 				self.spawner.master.process_task(self.task)
     83 		finally:
     84 			self.spawner.sem.release()
     85 			self.spawner.master.out.put(self.task)
     86 			self.task = None
     87 			self.spawner = None
     88 
     89 class Spawner(Utils.threading.Thread):
     90 	"""
     91 	Daemon thread that consumes tasks from :py:class:`waflib.Runner.Parallel` producer and
     92 	spawns a consuming thread :py:class:`waflib.Runner.Consumer` for each
     93 	:py:class:`waflib.Task.Task` instance.
     94 	"""
     95 	def __init__(self, master):
     96 		Utils.threading.Thread.__init__(self)
     97 		self.master = master
     98 		""":py:class:`waflib.Runner.Parallel` producer instance"""
     99 		self.sem = Utils.threading.Semaphore(master.numjobs)
    100 		"""Bounded semaphore that prevents spawning more than *n* concurrent consumers"""
    101 		self.daemon = True
    102 		self.start()
    103 	def run(self):
    104 		"""
    105 		Spawns new consumers to execute tasks by delegating to :py:meth:`waflib.Runner.Spawner.loop`
    106 		"""
    107 		try:
    108 			self.loop()
    109 		except Exception:
    110 			# Python 2 prints unnecessary messages when shutting down
    111 			# we also want to stop the thread properly
    112 			pass
    113 	def loop(self):
    114 		"""
    115 		Consumes task objects from the producer; ends when the producer has no more
    116 		task to provide.
    117 		"""
    118 		master = self.master
    119 		while 1:
    120 			task = master.ready.get()
    121 			self.sem.acquire()
    122 			if not master.stop:
    123 				task.log_display(task.generator.bld)
    124 			Consumer(self, task)
    125 
    126 class Parallel(object):
    127 	"""
    128 	Schedule the tasks obtained from the build context for execution.
    129 	"""
    130 	def __init__(self, bld, j=2):
    131 		"""
    132 		The initialization requires a build context reference
    133 		for computing the total number of jobs.
    134 		"""
    135 
    136 		self.numjobs = j
    137 		"""
    138 		Amount of parallel consumers to use
    139 		"""
    140 
    141 		self.bld = bld
    142 		"""
    143 		Instance of :py:class:`waflib.Build.BuildContext`
    144 		"""
    145 
    146 		self.outstanding = PriorityTasks()
    147 		"""Heap of :py:class:`waflib.Task.Task` that may be ready to be executed"""
    148 
    149 		self.postponed = PriorityTasks()
    150 		"""Heap of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons"""
    151 
    152 		self.incomplete = set()
    153 		"""List of :py:class:`waflib.Task.Task` waiting for dependent tasks to complete (DAG)"""
    154 
    155 		self.ready = PriorityQueue(0)
    156 		"""List of :py:class:`waflib.Task.Task` ready to be executed by consumers"""
    157 
    158 		self.out = Queue(0)
    159 		"""List of :py:class:`waflib.Task.Task` returned by the task consumers"""
    160 
    161 		self.count = 0
    162 		"""Amount of tasks that may be processed by :py:class:`waflib.Runner.TaskConsumer`"""
    163 
    164 		self.processed = 0
    165 		"""Amount of tasks processed"""
    166 
    167 		self.stop = False
    168 		"""Error flag to stop the build"""
    169 
    170 		self.error = []
    171 		"""Tasks that could not be executed"""
    172 
    173 		self.biter = None
    174 		"""Task iterator which must give groups of parallelizable tasks when calling ``next()``"""
    175 
    176 		self.dirty = False
    177 		"""
    178 		Flag that indicates that the build cache must be saved when a task was executed
    179 		(calls :py:meth:`waflib.Build.BuildContext.store`)"""
    180 
    181 		self.revdeps = Utils.defaultdict(set)
    182 		"""
    183 		The reverse dependency graph of dependencies obtained from Task.run_after
    184 		"""
    185 
    186 		self.spawner = None
    187 		"""
    188 		Coordinating daemon thread that spawns thread consumers
    189 		"""
    190 		if self.numjobs > 1:
    191 			self.spawner = Spawner(self)
    192 
    193 	def get_next_task(self):
    194 		"""
    195 		Obtains the next Task instance to run
    196 
    197 		:rtype: :py:class:`waflib.Task.Task`
    198 		"""
    199 		if not self.outstanding:
    200 			return None
    201 		return self.outstanding.pop()
    202 
    203 	def postpone(self, tsk):
    204 		"""
    205 		Adds the task to the list :py:attr:`waflib.Runner.Parallel.postponed`.
    206 		The order is scrambled so as to consume as many tasks in parallel as possible.
    207 
    208 		:param tsk: task instance
    209 		:type tsk: :py:class:`waflib.Task.Task`
    210 		"""
    211 		self.postponed.append(tsk)
    212 
    213 	def refill_task_list(self):
    214 		"""
    215 		Pulls a next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`.
    216 		Ensures that all tasks in the current build group are complete before processing the next one.
    217 		"""
    218 		while self.count > self.numjobs * GAP:
    219 			self.get_out()
    220 
    221 		while not self.outstanding:
    222 			if self.count:
    223 				self.get_out()
    224 				if self.outstanding:
    225 					break
    226 			elif self.postponed:
    227 				try:
    228 					cond = self.deadlock == self.processed
    229 				except AttributeError:
    230 					pass
    231 				else:
    232 					if cond:
    233 						# The most common reason is conflicting build order declaration
    234 						# for example: "X run_after Y" and "Y run_after X"
    235 						# Another can be changing "run_after" dependencies while the build is running
    236 						# for example: updating "tsk.run_after" in the "runnable_status" method
    237 						lst = []
    238 						for tsk in self.postponed:
    239 							deps = [id(x) for x in tsk.run_after if not x.hasrun]
    240 							lst.append('%s\t-> %r' % (repr(tsk), deps))
    241 							if not deps:
    242 								lst.append('\n  task %r dependencies are done, check its *runnable_status*?' % id(tsk))
    243 						raise Errors.WafError('Deadlock detected: check the task build order%s' % ''.join(lst))
    244 				self.deadlock = self.processed
    245 
    246 			if self.postponed:
    247 				self.outstanding.extend(self.postponed)
    248 				self.postponed.clear()
    249 			elif not self.count:
    250 				if self.incomplete:
    251 					for x in self.incomplete:
    252 						for k in x.run_after:
    253 							if not k.hasrun:
    254 								break
    255 						else:
    256 							# dependency added after the build started without updating revdeps
    257 							self.incomplete.remove(x)
    258 							self.outstanding.append(x)
    259 							break
    260 					else:
    261 						if self.stop or self.error:
    262 							break
    263 						raise Errors.WafError('Broken revdeps detected on %r' % self.incomplete)
    264 				else:
    265 					tasks = next(self.biter)
    266 					ready, waiting = self.prio_and_split(tasks)
    267 					self.outstanding.extend(ready)
    268 					self.incomplete.update(waiting)
    269 					self.total = self.bld.total()
    270 					break
    271 
    272 	def add_more_tasks(self, tsk):
    273 		"""
    274 		If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained
    275 		in that list are added to the current build and will be processed before the next build group.
    276 
    277 		The priorities for dependent tasks are not re-calculated globally
    278 
    279 		:param tsk: task instance
    280 		:type tsk: :py:attr:`waflib.Task.Task`
    281 		"""
    282 		if getattr(tsk, 'more_tasks', None):
    283 			more = set(tsk.more_tasks)
    284 			groups_done = set()
    285 			def iteri(a, b):
    286 				for x in a:
    287 					yield x
    288 				for x in b:
    289 					yield x
    290 
    291 			# Update the dependency tree
    292 			# this assumes that task.run_after values were updated
    293 			for x in iteri(self.outstanding, self.incomplete):
    294 				for k in x.run_after:
    295 					if isinstance(k, Task.TaskGroup):
    296 						if k not in groups_done:
    297 							groups_done.add(k)
    298 							for j in k.prev & more:
    299 								self.revdeps[j].add(k)
    300 					elif k in more:
    301 						self.revdeps[k].add(x)
    302 
    303 			ready, waiting = self.prio_and_split(tsk.more_tasks)
    304 			self.outstanding.extend(ready)
    305 			self.incomplete.update(waiting)
    306 			self.total += len(tsk.more_tasks)
    307 
    308 	def mark_finished(self, tsk):
    309 		def try_unfreeze(x):
    310 			# DAG ancestors are likely to be in the incomplete set
    311 			# This assumes that the run_after contents have not changed
    312 			# after the build starts, else a deadlock may occur
    313 			if x in self.incomplete:
    314 				# TODO remove dependencies to free some memory?
    315 				# x.run_after.remove(tsk)
    316 				for k in x.run_after:
    317 					if not k.hasrun:
    318 						break
    319 				else:
    320 					self.incomplete.remove(x)
    321 					self.outstanding.append(x)
    322 
    323 		if tsk in self.revdeps:
    324 			for x in self.revdeps[tsk]:
    325 				if isinstance(x, Task.TaskGroup):
    326 					x.prev.remove(tsk)
    327 					if not x.prev:
    328 						for k in x.next:
    329 							# TODO necessary optimization?
    330 							k.run_after.remove(x)
    331 							try_unfreeze(k)
    332 						# TODO necessary optimization?
    333 						x.next = []
    334 				else:
    335 					try_unfreeze(x)
    336 			del self.revdeps[tsk]
    337 
    338 		if hasattr(tsk, 'semaphore'):
    339 			sem = tsk.semaphore
    340 			try:
    341 				sem.release(tsk)
    342 			except KeyError:
    343 				# TODO
    344 				pass
    345 			else:
    346 				while sem.waiting and not sem.is_locked():
    347 					# take a frozen task, make it ready to run
    348 					x = sem.waiting.pop()
    349 					self._add_task(x)
    350 
    351 	def get_out(self):
    352 		"""
    353 		Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution.
    354 		Adds more Tasks if necessary through :py:attr:`waflib.Runner.Parallel.add_more_tasks`.
    355 
    356 		:rtype: :py:attr:`waflib.Task.Task`
    357 		"""
    358 		tsk = self.out.get()
    359 		if not self.stop:
    360 			self.add_more_tasks(tsk)
    361 		self.mark_finished(tsk)
    362 
    363 		self.count -= 1
    364 		self.dirty = True
    365 		return tsk
    366 
    367 	def add_task(self, tsk):
    368 		"""
    369 		Enqueue a Task to :py:attr:`waflib.Runner.Parallel.ready` so that consumers can run them.
    370 
    371 		:param tsk: task instance
    372 		:type tsk: :py:attr:`waflib.Task.Task`
    373 		"""
    374 		# TODO change in waf 2.1
    375 		self.ready.put(tsk)
    376 
    377 	def _add_task(self, tsk):
    378 		if hasattr(tsk, 'semaphore'):
    379 			sem = tsk.semaphore
    380 			try:
    381 				sem.acquire(tsk)
    382 			except IndexError:
    383 				sem.waiting.add(tsk)
    384 				return
    385 
    386 		self.count += 1
    387 		self.processed += 1
    388 		if self.numjobs == 1:
    389 			tsk.log_display(tsk.generator.bld)
    390 			try:
    391 				self.process_task(tsk)
    392 			finally:
    393 				self.out.put(tsk)
    394 		else:
    395 			self.add_task(tsk)
    396 
    397 	def process_task(self, tsk):
    398 		"""
    399 		Processes a task and attempts to stop the build in case of errors
    400 		"""
    401 		tsk.process()
    402 		if tsk.hasrun != Task.SUCCESS:
    403 			self.error_handler(tsk)
    404 
    405 	def skip(self, tsk):
    406 		"""
    407 		Mark a task as skipped/up-to-date
    408 		"""
    409 		tsk.hasrun = Task.SKIPPED
    410 		self.mark_finished(tsk)
    411 
    412 	def cancel(self, tsk):
    413 		"""
    414 		Mark a task as failed because of unsatisfiable dependencies
    415 		"""
    416 		tsk.hasrun = Task.CANCELED
    417 		self.mark_finished(tsk)
    418 
    419 	def error_handler(self, tsk):
    420 		"""
    421 		Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set,
    422 		unless the build is executed with::
    423 
    424 			$ waf build -k
    425 
    426 		:param tsk: task instance
    427 		:type tsk: :py:attr:`waflib.Task.Task`
    428 		"""
    429 		if not self.bld.keep:
    430 			self.stop = True
    431 		self.error.append(tsk)
    432 
    433 	def task_status(self, tsk):
    434 		"""
    435 		Obtains the task status to decide whether to run it immediately or not.
    436 
    437 		:return: the exit status, for example :py:attr:`waflib.Task.ASK_LATER`
    438 		:rtype: integer
    439 		"""
    440 		try:
    441 			return tsk.runnable_status()
    442 		except Exception:
    443 			self.processed += 1
    444 			tsk.err_msg = traceback.format_exc()
    445 			if not self.stop and self.bld.keep:
    446 				self.skip(tsk)
    447 				if self.bld.keep == 1:
    448 					# if -k stop on the first exception, if -kk try to go as far as possible
    449 					if Logs.verbose > 1 or not self.error:
    450 						self.error.append(tsk)
    451 					self.stop = True
    452 				else:
    453 					if Logs.verbose > 1:
    454 						self.error.append(tsk)
    455 				return Task.EXCEPTION
    456 
    457 			tsk.hasrun = Task.EXCEPTION
    458 			self.error_handler(tsk)
    459 
    460 			return Task.EXCEPTION
    461 
    462 	def start(self):
    463 		"""
    464 		Obtains Task instances from the BuildContext instance and adds the ones that need to be executed to
    465 		:py:class:`waflib.Runner.Parallel.ready` so that the :py:class:`waflib.Runner.Spawner` consumer thread
    466 		has them executed. Obtains the executed Tasks back from :py:class:`waflib.Runner.Parallel.out`
    467 		and marks the build as failed by setting the ``stop`` flag.
    468 		If only one job is used, then executes the tasks one by one, without consumers.
    469 		"""
    470 		self.total = self.bld.total()
    471 
    472 		while not self.stop:
    473 
    474 			self.refill_task_list()
    475 
    476 			# consider the next task
    477 			tsk = self.get_next_task()
    478 			if not tsk:
    479 				if self.count:
    480 					# tasks may add new ones after they are run
    481 					continue
    482 				else:
    483 					# no tasks to run, no tasks running, time to exit
    484 					break
    485 
    486 			if tsk.hasrun:
    487 				# if the task is marked as "run", just skip it
    488 				self.processed += 1
    489 				continue
    490 
    491 			if self.stop: # stop immediately after a failure is detected
    492 				break
    493 
    494 			st = self.task_status(tsk)
    495 			if st == Task.RUN_ME:
    496 				self._add_task(tsk)
    497 			elif st == Task.ASK_LATER:
    498 				self.postpone(tsk)
    499 			elif st == Task.SKIP_ME:
    500 				self.processed += 1
    501 				self.skip(tsk)
    502 				self.add_more_tasks(tsk)
    503 			elif st == Task.CANCEL_ME:
    504 				# A dependency problem has occurred, and the
    505 				# build is most likely run with `waf -k`
    506 				if Logs.verbose > 1:
    507 					self.error.append(tsk)
    508 				self.processed += 1
    509 				self.cancel(tsk)
    510 
    511 		# self.count represents the tasks that have been made available to the consumer threads
    512 		# collect all the tasks after an error else the message may be incomplete
    513 		while self.error and self.count:
    514 			self.get_out()
    515 
    516 		self.ready.put(None)
    517 		if not self.stop:
    518 			assert not self.count
    519 			assert not self.postponed
    520 			assert not self.incomplete
    521 
    522 	def prio_and_split(self, tasks):
    523 		"""
    524 		Label input tasks with priority values, and return a pair containing
    525 		the tasks that are ready to run and the tasks that are necessarily
    526 		waiting for other tasks to complete.
    527 
    528 		The priority system is really meant as an optional layer for optimization:
    529 		dependency cycles are found quickly, and builds should be more efficient.
    530 		A high priority number means that a task is processed first.
    531 
    532 		This method can be overridden to disable the priority system::
    533 
    534 			def prio_and_split(self, tasks):
    535 				return tasks, []
    536 
    537 		:return: A pair of task lists
    538 		:rtype: tuple
    539 		"""
    540 		# to disable:
    541 		#return tasks, []
    542 		for x in tasks:
    543 			x.visited = 0
    544 
    545 		reverse = self.revdeps
    546 
    547 		groups_done = set()
    548 		for x in tasks:
    549 			for k in x.run_after:
    550 				if isinstance(k, Task.TaskGroup):
    551 					if k not in groups_done:
    552 						groups_done.add(k)
    553 						for j in k.prev:
    554 							reverse[j].add(k)
    555 				else:
    556 					reverse[k].add(x)
    557 
    558 		# the priority number is not the tree depth
    559 		def visit(n):
    560 			if isinstance(n, Task.TaskGroup):
    561 				return sum(visit(k) for k in n.next)
    562 
    563 			if n.visited == 0:
    564 				n.visited = 1
    565 
    566 				if n in reverse:
    567 					rev = reverse[n]
    568 					n.prio_order = n.tree_weight + len(rev) + sum(visit(k) for k in rev)
    569 				else:
    570 					n.prio_order = n.tree_weight
    571 
    572 				n.visited = 2
    573 			elif n.visited == 1:
    574 				raise Errors.WafError('Dependency cycle found!')
    575 			return n.prio_order
    576 
    577 		for x in tasks:
    578 			if x.visited != 0:
    579 				# must visit all to detect cycles
    580 				continue
    581 			try:
    582 				visit(x)
    583 			except Errors.WafError:
    584 				self.debug_cycles(tasks, reverse)
    585 
    586 		ready = []
    587 		waiting = []
    588 		for x in tasks:
    589 			for k in x.run_after:
    590 				if not k.hasrun:
    591 					waiting.append(x)
    592 					break
    593 			else:
    594 				ready.append(x)
    595 		return (ready, waiting)
    596 
    597 	def debug_cycles(self, tasks, reverse):
    598 		tmp = {}
    599 		for x in tasks:
    600 			tmp[x] = 0
    601 
    602 		def visit(n, acc):
    603 			if isinstance(n, Task.TaskGroup):
    604 				for k in n.next:
    605 					visit(k, acc)
    606 				return
    607 			if tmp[n] == 0:
    608 				tmp[n] = 1
    609 				for k in reverse.get(n, []):
    610 					visit(k, [n] + acc)
    611 				tmp[n] = 2
    612 			elif tmp[n] == 1:
    613 				lst = []
    614 				for tsk in acc:
    615 					lst.append(repr(tsk))
    616 					if tsk is n:
    617 						# exclude prior nodes, we want the minimum cycle
    618 						break
    619 				raise Errors.WafError('Task dependency cycle in "run_after" constraints: %s' % ''.join(lst))
    620 		for x in tasks:
    621 			visit(x, [])
    622