Source code
Revision control
Copy as Markdown
Other Tools
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
import json
import logging
import os
import re
import subprocess
import sys
import mozpack.path as mozpath
from mach.decorators import Command, CommandArgument
PR_REPOSITORIES = {
"webrender": {
"github": "servo/webrender",
"path": "gfx/wr",
"bugzilla_product": "Core",
"bugzilla_component": "Graphics: WebRender",
},
"webgpu": {
"github": "gfx-rs/wgpu",
"path": "gfx/wgpu",
"bugzilla_product": "Core",
"bugzilla_component": "Graphics: WebGPU",
},
"debugger": {
"github": "firefox-devtools/debugger",
"path": "devtools/client/debugger",
"bugzilla_product": "DevTools",
"bugzilla_component": "Debugger",
},
}
@Command(
"import-pr",
category="misc",
description="Import a pull request from Github to the local repo.",
)
@CommandArgument("-b", "--bug-number", help="Bug number to use in the commit messages.")
@CommandArgument(
"-t",
"--bugzilla-token",
help="Bugzilla API token used to file a new bug if no bug number is provided.",
)
@CommandArgument("-r", "--reviewer", help="Reviewer nick to apply to commit messages.")
@CommandArgument(
"pull_request",
help="URL to the pull request to import (e.g. "
)
def import_pr(
command_context,
pull_request,
bug_number=None,
bugzilla_token=None,
reviewer=None,
):
import requests
pr_number = None
repository = None
for r in PR_REPOSITORIES.values():
if pull_request.startswith(GITHUB_ROOT + r["github"] + "/pull/"):
# sanitize URL, dropping anything after the PR number
pr_number = int(re.search("/pull/([0-9]+)", pull_request).group(1))
pull_request = GITHUB_ROOT + r["github"] + "/pull/" + str(pr_number)
repository = r
break
if repository is None:
command_context.log(
logging.ERROR,
"unrecognized_repo",
{},
"The pull request URL was not recognized; add it to the list of "
"recognized repos in PR_REPOSITORIES in %s" % __file__,
)
sys.exit(1)
command_context.log(
logging.INFO,
"import_pr",
{"pr_url": pull_request},
"Attempting to import {pr_url}",
)
dirty = [
f
for f in command_context.repository.get_changed_files(mode="all")
if f.startswith(repository["path"])
]
if dirty:
command_context.log(
logging.ERROR,
"dirty_tree",
repository,
"Local {path} tree is dirty; aborting!",
)
sys.exit(1)
target_dir = mozpath.join(
command_context.topsrcdir, os.path.normpath(repository["path"])
)
if bug_number is None:
if bugzilla_token is None:
command_context.log(
logging.WARNING,
"no_token",
{},
"No bug number or bugzilla API token provided; bug number will not "
"be added to commit messages.",
)
else:
bug_number = _file_bug(
command_context, bugzilla_token, repository, pr_number
)
elif bugzilla_token is not None:
command_context.log(
logging.WARNING,
"too_much_bug",
{},
"Providing a bugzilla token is unnecessary when a bug number is provided. "
"Using bug number; ignoring token.",
)
pr_patch = requests.get(pull_request + ".patch")
pr_patch.raise_for_status()
for patch in _split_patches(pr_patch.content, bug_number, pull_request, reviewer):
command_context.log(
logging.INFO,
"commit_msg",
patch,
"Processing commit [{commit_summary}] by [{author}] at [{date}]",
)
patch_cmd = subprocess.Popen(
["patch", "-p1", "-s"], stdin=subprocess.PIPE, cwd=target_dir
)
patch_cmd.stdin.write(patch["diff"].encode("utf-8"))
patch_cmd.stdin.close()
patch_cmd.wait()
if patch_cmd.returncode != 0:
command_context.log(
logging.ERROR,
"commit_fail",
{},
'Error applying diff from commit via "patch -p1 -s". Aborting...',
)
sys.exit(patch_cmd.returncode)
command_context.repository.commit(
patch["commit_msg"], patch["author"], patch["date"], [target_dir]
)
command_context.log(logging.INFO, "commit_pass", {}, "Committed successfully.")
def _file_bug(command_context, token, repo, pr_number):
import requests
bug = requests.post(
json={
"product": repo["bugzilla_product"],
"component": repo["bugzilla_component"],
"summary": "Land %s#%s in mozilla-central" % (repo["github"], pr_number),
"version": "unspecified",
},
)
bug.raise_for_status()
command_context.log(logging.DEBUG, "new_bug", {}, bug.content)
bugnumber = json.loads(bug.content)["id"]
command_context.log(
logging.INFO, "new_bug", {"bugnumber": bugnumber}, "Filed bug {bugnumber}"
)
return bugnumber
def _split_patches(patchfile, bug_number, pull_request, reviewer):
INITIAL = 0
HEADERS = 1
STAT_AND_DIFF = 2
patch = b""
state = INITIAL
for line in patchfile.splitlines():
if state == INITIAL:
if line.startswith(b"From "):
state = HEADERS
elif state == HEADERS:
patch += line + b"\n"
if line == b"---":
state = STAT_AND_DIFF
elif state == STAT_AND_DIFF:
if line.startswith(b"From "):
yield _parse_patch(patch, bug_number, pull_request, reviewer)
patch = b""
state = HEADERS
else:
patch += line + b"\n"
if len(patch) > 0:
yield _parse_patch(patch, bug_number, pull_request, reviewer)
return
def _parse_patch(patch, bug_number, pull_request, reviewer):
import email
from email import header, policy
parse_policy = policy.compat32.clone(max_line_length=None)
parsed_mail = email.message_from_bytes(patch, policy=parse_policy)
def header_as_unicode(key):
decoded = header.decode_header(parsed_mail[key])
return str(header.make_header(decoded))
author = header_as_unicode("From")
date = header_as_unicode("Date")
commit_summary = header_as_unicode("Subject")
email_body = parsed_mail.get_payload(decode=True).decode("utf-8")
(commit_body, diff) = ("\n" + email_body).rsplit("\n---\n", 1)
bug_prefix = ""
if bug_number is not None:
bug_prefix = "Bug %s - " % bug_number
commit_summary = re.sub(r"^\[PATCH[0-9 /]*\] ", bug_prefix, commit_summary)
if reviewer is not None:
commit_summary += " r=" + reviewer
commit_msg = commit_summary + "\n"
if len(commit_body) > 0:
commit_msg += commit_body + "\n"
commit_msg += "\n[import_pr] From " + pull_request + "\n"
patch_obj = {
"author": author,
"date": date,
"commit_summary": commit_summary,
"commit_msg": commit_msg,
"diff": diff,
}
return patch_obj