Source code

Revision control

Copy as Markdown

Other Tools

import logging
from datetime import datetime
from taskgraph.optimize.base import OptimizationStrategy, register_strategy
from taskgraph.util.path import match as match_path
from taskgraph.util.taskcluster import find_task_id, status_task
logger = logging.getLogger("optimization")
@register_strategy("index-search")
class IndexSearch(OptimizationStrategy):
# A task with no dependencies remaining after optimization will be replaced
# if artifacts exist for the corresponding index_paths.
# Otherwise, we're in one of the following cases:
# - the task has un-optimized dependencies
# - the artifacts have expired
# - some changes altered the index_paths and new artifacts need to be
# created.
# In every of those cases, we need to run the task to create or refresh
# artifacts.
fmt = "%Y-%m-%dT%H:%M:%S.%fZ"
def should_replace_task(self, task, params, deadline, arg):
"Look for a task with one of the given index paths"
batched = False
# Appease static checker that doesn't understand that this is not needed
label_to_taskid = {}
taskid_to_status = {}
if isinstance(arg, tuple) and len(arg) == 3:
# allow for a batched call optimization instead of two queries
# per index path
index_paths, label_to_taskid, taskid_to_status = arg
batched = True
else:
index_paths = arg
for index_path in index_paths:
try:
if batched:
task_id = label_to_taskid[index_path]
status = taskid_to_status[task_id]
else:
# 404 is raised as `KeyError` also end up here
task_id = find_task_id(index_path)
status = status_task(task_id)
# status can be `None` if we're in `testing` mode
# (e.g. test-action-callback)
if not status or status.get("state") in ("exception", "failed"):
logger.debug(
f"not replacing {task.label} with {task_id} because it is in failed or exception state"
)
continue
if deadline and datetime.strptime(
status["expires"], self.fmt
) < datetime.strptime(deadline, self.fmt):
logger.debug(
f"not replacing {task.label} with {task_id} because it expires before {deadline}"
)
continue
return task_id
except KeyError:
# go on to the next index path
pass
return False
@register_strategy("skip-unless-changed")
class SkipUnlessChanged(OptimizationStrategy):
def check(self, files_changed, patterns):
for pattern in patterns:
for path in files_changed:
if match_path(path, pattern):
return True
return False
def should_remove_task(self, task, params, file_patterns):
# skip-unless-changed should not apply when there is no commit delta,
# such as for cron and action tasks (there will never be file changes)
if params.get("base_rev") and params.get("head_rev") == params.get("base_rev"):
return False
changed = self.check(params["files_changed"], file_patterns)
if not changed:
logger.debug(
f'no files found matching a pattern in `skip-unless-changed` for "{task.label}"'
)
return True
return False