Source code

Revision control

Copy as Markdown

Other Tools

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import argparse
import atexit
import logging
import os
import re
import shutil
import subprocess
import sys
import tempfile
import traceback
from collections import namedtuple
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from textwrap import dedent
from typing import Any, List
from urllib.parse import urlparse
import appdirs
import yaml
Command = namedtuple("Command", ["func", "args", "kwargs", "defaults"])
commands = {}
def command(*args, **kwargs):
defaults = kwargs.pop("defaults", {})
def decorator(func):
commands[args[0]] = Command(func, args, kwargs, defaults)
return func
return decorator
def argument(*args, **kwargs):
def decorator(func):
if not hasattr(func, "args"):
func.args = []
func.args.append((args, kwargs))
return func
return decorator
def format_taskgraph_labels(taskgraph):
return "\n".join(
sorted(
taskgraph.tasks[index].label for index in taskgraph.graph.visit_postorder()
)
)
def format_taskgraph_json(taskgraph):
from taskgraph.util import json
return json.dumps(taskgraph.to_json(), sort_keys=True, indent=2)
def format_taskgraph_yaml(taskgraph):
return yaml.safe_dump(taskgraph.to_json(), default_flow_style=False)
def get_filtered_taskgraph(taskgraph, tasksregex, exclude_keys):
"""
Filter all the tasks on basis of a regular expression
and returns a new TaskGraph object
"""
from taskgraph.graph import Graph
from taskgraph.task import Task
from taskgraph.taskgraph import TaskGraph
if tasksregex:
named_links_dict = taskgraph.graph.named_links_dict()
filteredtasks = {}
filterededges = set()
regexprogram = re.compile(tasksregex)
for key in taskgraph.graph.visit_postorder():
task = taskgraph.tasks[key]
if regexprogram.match(task.label):
filteredtasks[key] = task
for depname, dep in named_links_dict[key].items():
if regexprogram.match(dep):
filterededges.add((key, dep, depname))
taskgraph = TaskGraph(
filteredtasks, Graph(frozenset(filteredtasks), frozenset(filterededges))
)
if exclude_keys:
for label, task in taskgraph.tasks.items():
task = task.to_json()
for key in exclude_keys:
obj = task
attrs = key.split(".")
while obj and attrs[0] in obj:
if len(attrs) == 1:
del obj[attrs[0]]
break
obj = obj[attrs[0]]
attrs = attrs[1:]
taskgraph.tasks[label] = Task.from_json(task)
return taskgraph
FORMAT_METHODS = {
"labels": format_taskgraph_labels,
"json": format_taskgraph_json,
"yaml": format_taskgraph_yaml,
}
def get_taskgraph_generator(root, parameters):
"""Helper function to make testing a little easier."""
from taskgraph.generator import TaskGraphGenerator
return TaskGraphGenerator(root_dir=root, parameters=parameters)
def format_taskgraph(options, parameters, overrides, logfile=None):
import taskgraph
from taskgraph.parameters import parameters_loader
if logfile:
handler = logging.FileHandler(logfile, mode="w")
if logging.root.handlers:
oldhandler = logging.root.handlers[-1]
logging.root.removeHandler(oldhandler)
handler.setFormatter(oldhandler.formatter)
logging.root.addHandler(handler)
if options["fast"]:
taskgraph.fast = True
if isinstance(parameters, str):
parameters = parameters_loader(
parameters,
overrides=overrides,
strict=False,
)
tgg = get_taskgraph_generator(options.get("root"), parameters)
tg = getattr(tgg, options["graph_attr"])
tg = get_filtered_taskgraph(tg, options["tasks_regex"], options["exclude_keys"])
format_method = FORMAT_METHODS[options["format"] or "labels"]
return format_method(tg)
def dump_output(out, path=None, params_spec=None):
from taskgraph.parameters import Parameters
params_name = Parameters.format_spec(params_spec)
fh = None
if path:
# Substitute params name into file path if necessary
if params_spec and "{params}" not in path:
name, ext = os.path.splitext(path)
name += "_{params}"
path = name + ext
path = path.format(params=params_name)
fh = open(path, "w")
else:
print(
f"Dumping result with parameters from {params_name}:",
file=sys.stderr,
)
print(out + "\n", file=fh)
def generate_taskgraph(options, parameters, overrides, logdir):
from taskgraph.parameters import Parameters
def logfile(spec):
"""Determine logfile given a parameters specification."""
if logdir is None:
return None
return os.path.join(
logdir,
"{}_{}.log".format(options["graph_attr"], Parameters.format_spec(spec)),
)
# Don't bother using futures if there's only one parameter. This can make
# tracebacks a little more readable and avoids additional process overhead.
if len(parameters) == 1:
spec = parameters[0]
out = format_taskgraph(options, spec, overrides, logfile(spec))
dump_output(out, options["output_file"])
return 0
futures = {}
with ProcessPoolExecutor(max_workers=options["max_workers"]) as executor:
for spec in parameters:
f = executor.submit(
format_taskgraph, options, spec, overrides, logfile(spec)
)
futures[f] = spec
returncode = 0
for future in as_completed(futures):
output_file = options["output_file"]
spec = futures[future]
e = future.exception()
if e:
returncode = 1
out = "".join(traceback.format_exception(type(e), e, e.__traceback__))
if options["diff"]:
# Dump to console so we don't accidentally diff the tracebacks.
output_file = None
else:
out = future.result()
dump_output(
out,
path=output_file,
params_spec=spec if len(parameters) > 1 else None,
)
return returncode
@command(
"tasks",
help="Show the full task set in the task graph. The full task set includes all tasks defined by any kind, without edges (dependencies) between them.",
defaults={"graph_attr": "full_task_set"},
)
@command(
"full",
help="Show the full task graph. The full task graph consists of the full task set, with edges (dependencies) between tasks.",
defaults={"graph_attr": "full_task_graph"},
)
@command(
"target",
help="Show the target task set in the task graph. The target task set includes the tasks which have indicated they should be run, without edges (dependencies) between them.",
defaults={"graph_attr": "target_task_set"},
)
@command(
"target-graph",
help="Show the target task graph. The target task graph consists of the target task set, with edges (dependencies) between tasks.",
defaults={"graph_attr": "target_task_graph"},
)
@command(
"optimized",
help="Show the optimized task graph, which is the target task set with tasks optimized out (filtered, omitted, or replaced) and edges representing dependencies.",
defaults={"graph_attr": "optimized_task_graph"},
)
@command(
"morphed",
help="Show the morphed graph, which is the optimized task graph with additional morphs applied. It retains the same meaning as the optimized task graph but in a form more palatable to TaskCluster.",
defaults={"graph_attr": "morphed_task_graph"},
)
@argument("--root", "-r", help="root of the taskgraph definition relative to topsrcdir")
@argument("--quiet", "-q", action="store_true", help="suppress all logging output")
@argument(
"--verbose", "-v", action="store_true", help="include debug-level logging output"
)
@argument(
"--json",
"-J",
action="store_const",
dest="format",
const="json",
help="Output task graph as a JSON object",
)
@argument(
"--yaml",
"-Y",
action="store_const",
dest="format",
const="yaml",
help="Output task graph as a YAML object",
)
@argument(
"--labels",
"-L",
action="store_const",
dest="format",
const="labels",
help="Output the label for each task in the task graph (default)",
)
@argument(
"--parameters",
"-p",
default=None,
action="append",
help="Parameters to use for the generation. Can be a path to file (.yml or "
".json; see `taskcluster/docs/parameters.rst`), a directory (containing "
"parameters files), a url, of the form `project=mozilla-central` to download "
"latest parameters file for the specified project from CI, or of the form "
"`task-id=<decision task id>` to download parameters from the specified "
"decision task. Can be specified multiple times, in which case multiple "
"generations will happen from the same invocation (one per parameters "
"specified).",
)
@argument(
"--force-local-files-changed",
default=False,
action="store_true",
help="Compute the 'files-changed' parameter from local version control, "
"even when explicitly using a parameter set that already has it defined. "
"Note that this is already the default behaviour when no parameters are "
"specified.",
)
@argument(
"--no-optimize",
dest="optimize",
action="store_false",
default="true",
help="do not remove tasks from the graph that are found in the "
"index (a.k.a. optimize the graph)",
)
@argument(
"-o",
"--output-file",
default=None,
help="file path to store generated output.",
)
@argument(
"--tasks-regex",
"--tasks",
default=None,
help="only return tasks with labels matching this regular expression.",
)
@argument(
"--exclude-key",
default=None,
dest="exclude_keys",
action="append",
help="Exclude the specified key (using dot notation) from the final result. "
"This is mainly useful with '--diff' to filter out expected differences. Can be "
"used multiple times.",
)
@argument(
"-k",
"--target-kind",
dest="target_kinds",
action="append",
default=[],
help="only return tasks that are of the given kind, or their dependencies.",
)
@argument(
"-F",
"--fast",
default=False,
action="store_true",
help="enable fast task generation for local debugging.",
)
@argument(
"--diff",
const="default",
nargs="?",
default=None,
help="Generate and diff the current taskgraph against another revision. "
"Without args the base revision will be used. A revision specifier such as "
"the hash or `.~1` (hg) or `HEAD~1` (git) can be used as well.",
)
@argument(
"-j",
"--max-workers",
dest="max_workers",
default=None,
type=int,
help="The maximum number of workers to use for parallel operations such as"
"when multiple parameters files are passed.",
)
def show_taskgraph(options):
from taskgraph.parameters import Parameters, parameters_loader
from taskgraph.util.vcs import get_repository
if options.pop("verbose", False):
logging.root.setLevel(logging.DEBUG)
repo = None
cur_rev = None
diffdir = None
output_file = options["output_file"]
if options["diff"] or options["force_local_files_changed"]:
repo = get_repository(os.getcwd())
if options["diff"]:
assert repo is not None
if not repo.working_directory_clean():
print(
"abort: can't diff taskgraph with dirty working directory",
file=sys.stderr,
)
return 1
# We want to return the working directory to the current state
# as best we can after we're done. In all known cases, using
# branch or bookmark (which are both available on the VCS object)
# as `branch` is preferable to a specific revision.
cur_rev = repo.branch or repo.head_rev[:12]
cur_rev_file = cur_rev.replace("/", "_")
diffdir = tempfile.mkdtemp()
atexit.register(
shutil.rmtree, diffdir
) # make sure the directory gets cleaned up
options["output_file"] = os.path.join(
diffdir, f"{options['graph_attr']}_{cur_rev_file}"
)
print(f"Generating {options['graph_attr']} @ {cur_rev}", file=sys.stderr)
overrides = {
"target-kinds": options.get("target_kinds"),
}
parameters: List[Any[str, Parameters]] = options.pop("parameters")
if not parameters:
parameters = [
parameters_loader(None, strict=False, overrides=overrides)
] # will use default values
# This is the default behaviour anyway, so no need to re-compute.
options["force_local_files_changed"] = False
elif options["force_local_files_changed"]:
assert repo is not None
overrides["files-changed"] = sorted(repo.get_changed_files("AM"))
for param in parameters[:]:
if isinstance(param, str) and os.path.isdir(param):
parameters.remove(param)
parameters.extend(
[
p.as_posix()
for p in Path(param).iterdir()
if p.suffix in (".yml", ".json")
]
)
logdir = None
if len(parameters) > 1:
# Log to separate files for each process instead of stderr to
# avoid interleaving.
basename = os.path.basename(os.getcwd())
logdir = os.path.join(appdirs.user_log_dir("taskgraph"), basename)
if not os.path.isdir(logdir):
os.makedirs(logdir)
else:
# Only setup logging if we have a single parameter spec. Otherwise
# logging will go to files. This is also used as a hook for Gecko
# to setup its `mach` based logging.
setup_logging()
ret = generate_taskgraph(options, parameters, overrides, logdir)
if options["diff"]:
assert diffdir is not None
assert repo is not None
# Reload taskgraph modules to pick up changes and clear global state.
for mod in sys.modules.copy():
if mod != __name__ and mod.split(".", 1)[0].endswith("taskgraph"):
del sys.modules[mod]
if options["diff"] == "default":
base_rev = repo.base_rev
else:
base_rev = options["diff"]
base_rev_file = base_rev.replace("/", "_")
try:
repo.update(base_rev)
base_rev = repo.head_rev[:12]
options["output_file"] = os.path.join(
diffdir, f"{options['graph_attr']}_{base_rev_file}"
)
print(f"Generating {options['graph_attr']} @ {base_rev}", file=sys.stderr)
ret |= generate_taskgraph(options, parameters, overrides, logdir)
finally:
assert cur_rev
repo.update(cur_rev)
# Generate diff(s)
diffcmd = [
"diff",
"-U20",
"--report-identical-files",
f"--label={options['graph_attr']}@{base_rev}",
f"--label={options['graph_attr']}@{cur_rev}",
]
non_fatal_failures = []
for spec in parameters:
base_path = os.path.join(
diffdir, f"{options['graph_attr']}_{base_rev_file}"
)
cur_path = os.path.join(diffdir, f"{options['graph_attr']}_{cur_rev_file}") # type: ignore
params_name = None
if len(parameters) > 1:
params_name = Parameters.format_spec(spec)
base_path += f"_{params_name}"
cur_path += f"_{params_name}"
# If the base or cur files are missing it means that generation
# failed. If one of them failed but not the other, the failure is
# likely due to the patch making changes to taskgraph in modules
# that don't get reloaded (safe to ignore). If both generations
# failed, there's likely a real issue.
base_missing = not os.path.isfile(base_path)
cur_missing = not os.path.isfile(cur_path)
if base_missing != cur_missing: # != is equivalent to XOR for booleans
non_fatal_failures.append(os.path.basename(base_path))
continue
try:
# If the output file(s) are missing, this command will raise
# CalledProcessError with a returncode > 1.
proc = subprocess.run(
diffcmd + [base_path, cur_path],
capture_output=True,
text=True,
check=True,
)
diff_output = proc.stdout
returncode = 0
except subprocess.CalledProcessError as e:
# returncode 1 simply means diffs were found
if e.returncode != 1:
print(e.stderr, file=sys.stderr)
raise
diff_output = e.output
returncode = e.returncode
dump_output(
diff_output,
# Don't bother saving file if no diffs were found. Log to
# console in this case instead.
path=None if returncode == 0 else output_file,
params_spec=spec if len(parameters) > 1 else None,
)
if non_fatal_failures:
failstr = "\n ".join(sorted(non_fatal_failures))
print(
"WARNING: Diff skipped for the following generation{s} "
"due to failures:\n {failstr}".format(
s="s" if len(non_fatal_failures) > 1 else "", failstr=failstr
),
file=sys.stderr,
)
if options["format"] != "json":
print(
"If you were expecting differences in task bodies "
'you should pass "-J"\n',
file=sys.stderr,
)
if len(parameters) > 1:
print(f"See '{logdir}' for logs", file=sys.stderr)
return ret
@command("build-image", help="Build a Docker image")
@argument("image_name", help="Name of the image to build")
@argument(
"-t", "--tag", help="tag that the image should be built as.", metavar="name:tag"
)
@argument(
"--context-only",
help="File name the context tarball should be written to."
"with this option it will only build the context.tar.",
metavar="context.tar",
)
def build_image(args):
from taskgraph.docker import build_context, build_image
validate_docker()
if args["context_only"] is None:
build_image(args["image_name"], args["tag"], os.environ)
else:
build_context(args["image_name"], args["context_only"], os.environ)
@command(
"load-image",
help="Load a pre-built Docker image. Note that you need to "
"have docker installed and running for this to work.",
)
@argument(
"--task-id",
help="Load the image at public/image.tar.zst in this task, "
"rather than searching the index",
)
@argument(
"-t",
"--tag",
help="tag that the image should be loaded as. If not "
"image will be loaded with tag from the tarball",
metavar="name:tag",
)
@argument(
"image_name",
nargs="?",
help="Load the image of this name based on the current "
"contents of the tree (as built for mozilla-central "
"or mozilla-inbound)",
)
def load_image(args):
from taskgraph.docker import load_image_by_name, load_image_by_task_id
if not args.get("image_name") and not args.get("task_id"):
print("Specify either IMAGE-NAME or TASK-ID")
sys.exit(1)
validate_docker()
try:
if args["task_id"]:
tag = load_image_by_task_id(args["task_id"], args.get("tag"))
else:
tag = load_image_by_name(args["image_name"], args.get("tag"))
if not tag:
sys.exit(1)
except Exception:
traceback.print_exc()
sys.exit(1)
def validate_docker():
p = subprocess.run(["docker", "ps"], capture_output=True)
if p.returncode != 0:
print("Error connecting to Docker:", p.stderr)
sys.exit(1)
@command("image-digest", help="Print the digest of a docker image.")
@argument(
"image_name",
help="Print the digest of the image of this name based on the current "
"contents of the tree.",
)
def image_digest(args):
from taskgraph.docker import get_image_digest
try:
digest = get_image_digest(args["image_name"])
print(digest)
except Exception:
traceback.print_exc()
sys.exit(1)
@command(
"load-task",
help="Loads a pre-built Docker image and drops you into a container with "
"the same environment variables and run-task setup as the specified task. "
"The task's payload.command will be replaced with 'bash'. You need to have "
"docker installed and running for this to work.",
)
@argument("task_id", help="The task id to load into a docker container.")
@argument(
"--keep",
dest="remove",
action="store_false",
default=True,
help="Keep the docker container after exiting.",
)
@argument("--user", default=None, help="Container user to start shell with.")
def load_task(args):
from taskgraph.docker import load_task
validate_docker()
return load_task(args["task_id"], remove=args["remove"], user=args["user"])
@command("decision", help="Run the decision task")
@argument("--root", "-r", help="root of the taskgraph definition relative to topsrcdir")
@argument(
"--message",
required=False,
help=argparse.SUPPRESS,
)
@argument(
"--project",
required=True,
help="Project to use for creating task graph. Example: --project=try",
)
@argument("--pushlog-id", dest="pushlog_id", required=True, default="0")
@argument("--pushdate", dest="pushdate", required=True, type=int, default=0)
@argument("--owner", required=True, help="email address of who owns this graph")
@argument("--level", required=True, help="SCM level of this repository")
@argument(
"--target-tasks-method", help="method for selecting the target tasks to generate"
)
@argument(
"--repository-type",
required=True,
help='Type of repository, either "hg" or "git"',
)
@argument("--base-repository", required=True, help='URL for "base" repository to clone')
@argument(
"--base-ref", default="", help='Reference of the revision in the "base" repository'
)
@argument(
"--base-rev",
default="",
help="Taskgraph decides what to do based on the revision range between "
"`--base-rev` and `--head-rev`. Value is determined automatically if not provided",
)
@argument(
"--head-repository",
required=True,
help='URL for "head" repository to fetch revision from',
)
@argument(
"--head-ref", required=True, help="Reference (this is same as rev usually for hg)"
)
@argument(
"--head-rev", required=True, help="Commit revision to use from head repository"
)
@argument("--head-tag", help="Tag attached to the revision", default="")
@argument(
"--tasks-for", required=True, help="the tasks_for value used to generate this task"
)
@argument("--try-task-config-file", help="path to try task configuration file")
@argument(
"--verbose", "-v", action="store_true", help="include debug-level logging output"
)
def decision(options):
from taskgraph.decision import taskgraph_decision
taskgraph_decision(options)
@command("actions", help="Print the rendered actions.json")
@argument(
"--root",
"-r",
help="root of the taskgraph definition relative to topsrcdir",
default="taskcluster",
)
@argument(
"--verbose",
"-v",
action="store_true",
help="include debug-level logging output",
)
@argument(
"--parameters",
"-p",
default="",
help="parameters file (.yml or .json; see `taskcluster/docs/parameters.rst`)`",
)
def actions(args):
from taskgraph.actions import render_actions_json
from taskgraph.generator import TaskGraphGenerator
from taskgraph.parameters import parameters_loader
from taskgraph.util import json
if args.pop("verbose", False):
logging.root.setLevel(logging.DEBUG)
try:
parameters = parameters_loader(args["parameters"], strict=False)
tgg = TaskGraphGenerator(root_dir=args.get("root"), parameters=parameters)
actions = render_actions_json(tgg.parameters, tgg.graph_config, "DECISION-TASK")
print(json.dumps(actions, sort_keys=True, indent=2))
except Exception:
traceback.print_exc()
sys.exit(1)
return 0
@command("action-callback", description="Run action callback used by action tasks")
@argument(
"--root",
"-r",
default="taskcluster",
help="root of the taskgraph definition relative to topsrcdir",
)
def action_callback(options):
from taskgraph.actions import trigger_action_callback
from taskgraph.actions.util import get_parameters
from taskgraph.util import json
try:
# the target task for this action (or null if it's a group action)
task_id = json.loads(os.environ.get("ACTION_TASK_ID", "null"))
# the target task group for this action
task_group_id = os.environ.get("ACTION_TASK_GROUP_ID", None)
input = json.loads(os.environ.get("ACTION_INPUT", "null"))
callback = os.environ.get("ACTION_CALLBACK", None)
root = options["root"]
parameters = get_parameters(task_group_id)
return trigger_action_callback(
task_group_id=task_group_id,
task_id=task_id,
input=input,
callback=callback,
parameters=parameters,
root=root,
test=False,
)
except Exception:
traceback.print_exc()
sys.exit(1)
@command("test-action-callback", description="Run an action callback in a testing mode")
@argument(
"--root",
"-r",
default="taskcluster",
help="root of the taskgraph definition relative to topsrcdir",
)
@argument(
"--parameters",
"-p",
default="",
help="parameters file (.yml or .json; see `taskcluster/docs/parameters.rst`)`",
)
@argument("--task-id", default=None, help="TaskId to which the action applies")
@argument(
"--task-group-id", default=None, help="TaskGroupId to which the action applies"
)
@argument("--input", default=None, help="Action input (.yml or .json)")
@argument("callback", default=None, help="Action callback name (Python function name)")
def test_action_callback(options):
import taskgraph.actions
import taskgraph.parameters
from taskgraph.config import load_graph_config
from taskgraph.util import json, yaml
def load_data(filename):
with open(filename) as f:
if filename.endswith(".yml"):
return yaml.load_stream(f)
elif filename.endswith(".json"):
return json.load(f)
else:
raise Exception(f"unknown filename {filename}")
try:
task_id = options["task_id"]
if options["input"]:
input = load_data(options["input"])
else:
input = None
root = options["root"]
graph_config = load_graph_config(root)
trust_domain = graph_config["trust-domain"]
graph_config.register()
parameters = taskgraph.parameters.load_parameters_file(
options["parameters"], strict=False, trust_domain=trust_domain
)
parameters.check()
return taskgraph.actions.trigger_action_callback(
task_group_id=options["task_group_id"],
task_id=task_id,
input=input,
callback=options["callback"],
parameters=parameters,
root=root,
test=True,
)
except Exception:
traceback.print_exc()
sys.exit(1)
@command(
"init", description="Initialize a new Taskgraph setup in a new or existing project."
)
@argument(
"-f",
"--force",
action="store_true",
default=False,
help="Bypass safety checks.",
)
@argument(
"--prompt",
dest="no_input",
action="store_false",
default=True,
help="Prompt for input rather than using default values (advanced).",
)
@argument(
"--template",
default="gh:taskcluster/taskgraph",
help=argparse.SUPPRESS, # used for testing
)
def init_taskgraph(options):
from cookiecutter.main import cookiecutter
import taskgraph
from taskgraph.util.vcs import get_repository
repo = get_repository(os.getcwd())
root = Path(repo.path)
# Clean up existing installations if necessary.
tc_yml = root.joinpath(".taskcluster.yml")
if tc_yml.is_file():
if not options["force"]:
proceed = input(
"A Taskcluster setup already exists in this repository, "
"would you like to overwrite it? [y/N]: "
).lower()
while proceed not in ("y", "yes", "n", "no"):
proceed = input(f"Invalid option '{proceed}'! Try again: ")
if proceed[0] == "n":
sys.exit(1)
tc_yml.unlink()
tg_dir = root.joinpath("taskcluster")
if tg_dir.is_dir():
shutil.rmtree(tg_dir)
# Populate some defaults from the current repository.
context = {"project_name": root.name, "taskgraph_version": taskgraph.__version__}
try:
repo_url = repo.get_url(remote=repo.remote_name)
except RuntimeError:
repo_url = ""
if repo.tool == "git" and "github.com" in repo_url:
context["repo_host"] = "github"
else:
print(
dedent(
"""\
Repository not supported!
The `taskgraph init` command only supports repositories hosted on
Github. Ensure you use a remote that points to a Github repository.
"""
),
file=sys.stderr,
)
return 1
context["repo_name"] = urlparse(repo_url).path.rsplit("/", 1)[-1]
if context["repo_name"].endswith(".git"):
context["repo_name"] = context["repo_name"][: -len(".git")]
# Generate the project.
cookiecutter(
options["template"],
checkout=taskgraph.__version__,
directory="template",
extra_context=context,
no_input=options["no_input"],
output_dir=str(root.parent),
overwrite_if_exists=True,
)
def create_parser():
parser = argparse.ArgumentParser(description="Interact with taskgraph")
subparsers = parser.add_subparsers()
for _, (func, args, kwargs, defaults) in commands.items():
subparser = subparsers.add_parser(*args, **kwargs)
for arg in func.args:
subparser.add_argument(*arg[0], **arg[1])
subparser.set_defaults(command=func, **defaults)
return parser
def setup_logging():
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
)
def main(args=sys.argv[1:]):
setup_logging()
parser = create_parser()
if not args:
parser.print_help()
sys.exit(1)
args = parser.parse_args(args)
try:
return args.command(vars(args))
except Exception:
traceback.print_exc()
sys.exit(1)