Source code
Revision control
Copy as Markdown
Other Tools
# mypy: allow-untyped-defs
import argparse
import json
import logging
import os
import re
import subprocess
from collections import OrderedDict
import taskcluster
from . import taskgraph
here = os.path.abspath(os.path.dirname(__file__))
logging.basicConfig()
logger = logging.getLogger()
def get_triggers(event):
# Set some variables that we use to get the commits on the current branch
ref_prefix = "refs/heads/"
is_pr = "pull_request" in event
branch = None
if not is_pr and "ref" in event:
branch = event["ref"]
if branch.startswith(ref_prefix):
branch = branch[len(ref_prefix):]
return is_pr, branch
def fetch_event_data(queue):
try:
task_id = os.environ["TASK_ID"]
except KeyError:
logger.warning("Missing TASK_ID environment variable")
# For example under local testing
return None
task_data = queue.task(task_id)
return task_data.get("extra", {}).get("github_event")
def filter_triggers(event, all_tasks):
is_pr, branch = get_triggers(event)
triggered = OrderedDict()
for name, task in all_tasks.items():
if "trigger" in task:
if is_pr and "pull-request" in task["trigger"]:
triggered[name] = task
elif branch is not None and "branch" in task["trigger"]:
for trigger_branch in task["trigger"]["branch"]:
if (trigger_branch == branch or
trigger_branch.endswith("*") and branch.startswith(trigger_branch[:-1])):
triggered[name] = task
logger.info("Triggers match tasks:\n * %s" % "\n * ".join(triggered.keys()))
return triggered
def get_run_jobs(event):
from tools.ci import jobs
revish = "%s..%s" % (event["pull_request"]["base"]["sha"]
if "pull_request" in event
else event["before"],
event["pull_request"]["head"]["sha"]
if "pull_request" in event
else event["after"])
logger.info("Looking for changes in range %s" % revish)
paths = jobs.get_paths(revish=revish)
logger.info("Found changes in paths:%s" % "\n".join(paths))
path_jobs = jobs.get_jobs(paths)
all_jobs = path_jobs | get_extra_jobs(event)
logger.info("Including jobs:\n * %s" % "\n * ".join(all_jobs))
return all_jobs
def get_extra_jobs(event):
body = None
jobs = set()
if "commits" in event and event["commits"]:
body = event["commits"][0]["message"]
elif "pull_request" in event:
body = event["pull_request"]["body"]
if not body:
return jobs
regexp = re.compile(r"\s*tc-jobs:(.*)$")
for line in body.splitlines():
m = regexp.match(line)
if m:
items = m.group(1)
for item in items.split(","):
jobs.add(item.strip())
break
return jobs
def filter_excluded_users(tasks, event):
# Some users' pull requests are excluded from tasks,
# such as pull requests from automated exports.
try:
submitter = event["pull_request"]["user"]["login"]
except KeyError:
# Just ignore excluded users if the
# username cannot be pulled from the event.
logger.debug("Unable to read username from event. Continuing.")
return
excluded_tasks = []
# A separate list of items for tasks is needed to iterate over
# because removing an item during iteration will raise an error.
for name, task in list(tasks.items()):
if submitter in task.get("exclude-users", []):
excluded_tasks.append(name)
tasks.pop(name) # removing excluded task
if excluded_tasks:
logger.info(
f"Tasks excluded for user {submitter}:\n * " +
"\n * ".join(excluded_tasks)
)
def filter_schedule_if(event, tasks):
scheduled = OrderedDict()
run_jobs = None
for name, task in tasks.items():
if "schedule-if" in task:
if "run-job" in task["schedule-if"]:
if run_jobs is None:
run_jobs = get_run_jobs(event)
if "all" in run_jobs or any(item in run_jobs for item in task["schedule-if"]["run-job"]):
scheduled[name] = task
else:
scheduled[name] = task
logger.info("Scheduling rules match tasks:\n * %s" % "\n * ".join(scheduled.keys()))
return scheduled
def get_fetch_rev(event):
is_pr, _ = get_triggers(event)
if is_pr:
# Try to get the actual rev so that all non-decision tasks are pinned to that
rv = ["refs/pull/%s/merge" % event["pull_request"]["number"]]
# For every PR GitHub maintains a 'head' branch with commits from the
# PR, and a 'merge' branch containing a merge commit between the base
# branch and the PR.
for ref_type in ["head", "merge"]:
ref = "refs/pull/%s/%s" % (event["pull_request"]["number"], ref_type)
sha = None
try:
output = subprocess.check_output(["git", "ls-remote", "origin", ref])
except subprocess.CalledProcessError:
import traceback
logger.error(traceback.format_exc())
logger.error("Failed to get commit sha1 for %s" % ref)
else:
if not output:
logger.error("Failed to get commit for %s" % ref)
else:
sha = output.decode("utf-8").split()[0]
rv.append(sha)
rv = tuple(rv)
else:
# For a branch push we have a ref and a head but no merge SHA
rv = (event["ref"], event["after"], None)
assert len(rv) == 3
return rv
def build_full_command(event, task):
fetch_ref, head_sha, merge_sha = get_fetch_rev(event)
cmd_args = {
"task_name": task["name"],
"repo_url": event["repository"]["clone_url"],
"fetch_ref": fetch_ref,
"task_cmd": task["command"],
"install_str": "",
}
options = task.get("options", {})
options_args = []
options_args.append("--ref=%s" % fetch_ref)
if head_sha is not None:
options_args.append("--head-rev=%s" % head_sha)
if merge_sha is not None:
options_args.append("--merge-rev=%s" % merge_sha)
if options.get("oom-killer"):
options_args.append("--oom-killer")
if options.get("xvfb"):
options_args.append("--xvfb")
if not options.get("hosts"):
options_args.append("--no-hosts")
else:
options_args.append("--hosts")
# Check out the expected SHA unless it is overridden (e.g. to base_head).
if options.get("checkout"):
options_args.append("--checkout=%s" % options["checkout"])
for browser in options.get("browser", []):
options_args.append("--browser=%s" % browser)
if options.get("channel"):
options_args.append("--channel=%s" % options["channel"])
if options.get("install-certificates"):
options_args.append("--install-certificates")
cmd_args["options_str"] = " ".join(str(item) for item in options_args)
install_packages = task.get("install")
if install_packages:
install_items = ["apt update -qqy"]
install_items.extend("apt install -qqy %s" % item
for item in install_packages)
cmd_args["install_str"] = "\n".join("sudo %s;" % item for item in install_items)
return ["/bin/bash",
"--login",
"-xc",
"""
~/start.sh \
%(repo_url)s \
%(fetch_ref)s;
%(install_str)s
cd web-platform-tests;
./tools/ci/run_tc.py %(options_str)s -- %(task_cmd)s;
""" % cmd_args]
def get_owner(event):
if "pusher" in event:
pusher = event.get("pusher", {}).get("email", "")
if pusher and "@" in pusher:
return pusher
return "web-platform-tests@users.noreply.github.com"
def create_tc_task(event, task, taskgroup_id, depends_on_ids, env_extra=None):
command = build_full_command(event, task)
task_id = taskcluster.slugId()
task_data = {
"taskGroupId": taskgroup_id,
"created": taskcluster.fromNowJSON(""),
"deadline": taskcluster.fromNowJSON(task["deadline"]),
"provisionerId": task["provisionerId"],
"schedulerId": task["schedulerId"],
"workerType": task["workerType"],
"scopes": task.get("scopes", []),
"metadata": {
"name": task["name"],
"description": task.get("description", ""),
"owner": get_owner(event),
"source": event["repository"]["clone_url"]
},
"payload": {
"artifacts": task.get("artifacts"),
"command": command,
"image": task.get("image"),
"maxRunTime": task.get("maxRunTime"),
"env": task.get("env", {}),
},
"extra": {
"github_event": json.dumps(event)
},
"routes": ["checks"]
}
if "extra" in task:
task_data["extra"].update(task["extra"])
if task.get("privileged"):
if "capabilities" not in task_data["payload"]:
task_data["payload"]["capabilities"] = {}
task_data["payload"]["capabilities"]["privileged"] = True
if env_extra:
task_data["payload"]["env"].update(env_extra)
if depends_on_ids:
task_data["dependencies"] = depends_on_ids
task_data["requires"] = task.get("requires", "all-completed")
return task_id, task_data
def get_artifact_data(artifact, task_id_map):
task_id, data = task_id_map[artifact["task"]]
return {
"task": task_id,
"glob": artifact["glob"],
"dest": artifact["dest"],
"extract": artifact.get("extract", False)
}
def build_task_graph(event, all_tasks, tasks):
task_id_map = OrderedDict()
taskgroup_id = os.environ.get("TASK_ID", taskcluster.slugId())
sink_task_depends_on = []
def add_task(task_name, task):
depends_on_ids = []
if "depends-on" in task:
for depends_name in task["depends-on"]:
if depends_name not in task_id_map:
add_task(depends_name,
all_tasks[depends_name])
depends_on_ids.append(task_id_map[depends_name][0])
env_extra = {}
if "download-artifacts" in task:
env_extra["TASK_ARTIFACTS"] = json.dumps(
[get_artifact_data(artifact, task_id_map)
for artifact in task["download-artifacts"]])
task_id, task_data = create_tc_task(event, task, taskgroup_id, depends_on_ids,
env_extra=env_extra)
task_id_map[task_name] = (task_id, task_data)
# The string conversion here is because if we use variables they are
# converted to a string, so it's easier to use a string always
if str(task.get("required", "True")) != "False" and task_name != "sink-task":
sink_task_depends_on.append(task_id)
for task_name, task in tasks.items():
if task_name == "sink-task":
# sink-task will be created below at the end of the ordered dict,
# so that it can depend on all other tasks.
continue
add_task(task_name, task)
# GitHub branch protection for pull requests needs us to name explicit
# required tasks - which doesn't suffice when using a dynamic task graph.
# To work around this we declare a sink task that depends on all the other
# tasks completing, and checks if they have succeeded. We can then
# make the sink task the sole required task for pull requests.
sink_task = tasks.get("sink-task")
if sink_task:
logger.info("Scheduling sink-task")
sink_task["command"] += " {}".format(" ".join(sink_task_depends_on))
task_id_map["sink-task"] = create_tc_task(
event, sink_task, taskgroup_id, sink_task_depends_on)
else:
logger.info("sink-task is not scheduled")
return task_id_map
def create_tasks(queue, task_id_map):
for (task_id, task_data) in task_id_map.values():
queue.createTask(task_id, task_data)
def get_event(queue, event_path):
if event_path is not None:
try:
with open(event_path) as f:
event_str = f.read()
except OSError:
logger.error("Missing event file at path %s" % event_path)
raise
elif "TASK_EVENT" in os.environ:
event_str = os.environ["TASK_EVENT"]
else:
event_str = fetch_event_data(queue)
if not event_str:
raise ValueError("Can't find GitHub event definition; for local testing pass --event-path")
try:
return json.loads(event_str)
except ValueError:
logger.error("Event was not valid JSON")
raise
def decide(event):
all_tasks = taskgraph.load_tasks_from_path(os.path.join(here, "tasks", "test.yml"))
triggered_tasks = filter_triggers(event, all_tasks)
scheduled_tasks = filter_schedule_if(event, triggered_tasks)
filter_excluded_users(scheduled_tasks, event)
logger.info("UNSCHEDULED TASKS:\n %s" % "\n ".join(sorted(set(all_tasks.keys()) -
set(scheduled_tasks.keys()))))
logger.info("SCHEDULED TASKS:\n %s" % "\n ".join(sorted(scheduled_tasks.keys())))
task_id_map = build_task_graph(event, all_tasks, scheduled_tasks)
return task_id_map
def get_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--event-path",
help="Path to file containing serialized GitHub event")
parser.add_argument("--dry-run", action="store_true",
help="Don't actually create the tasks, just output the tasks that "
"would be created")
parser.add_argument("--tasks-path",
help="Path to file in which to write payload for all scheduled tasks")
return parser
def run(venv, **kwargs):
queue = taskcluster.Queue({'rootUrl': os.environ['TASKCLUSTER_PROXY_URL']})
event = get_event(queue, event_path=kwargs["event_path"])
task_id_map = decide(event)
try:
if not kwargs["dry_run"]:
create_tasks(queue, task_id_map)
else:
print(json.dumps(task_id_map, indent=2))
finally:
if kwargs["tasks_path"]:
with open(kwargs["tasks_path"], "w") as f:
json.dump(task_id_map, f, indent=2)