Source code
Revision control
Copy as Markdown
Other Tools
#!/usr/bin/env python3
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
import base64
import io
import json
import os
import subprocess
import sys
import tempfile
import time
import traceback
from mozlog import formatters, handlers, structuredlog
from PIL import Image, ImageChops, ImageDraw
from pixelmatch.contrib.PIL import pixelmatch
from selenium import webdriver
from selenium.common.exceptions import TimeoutException, WebDriverException
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
class SnapTestsBase:
_INSTANCE = None
def __init__(self, exp):
self._INSTANCE = os.environ.get("TEST_SNAP_INSTANCE")
self._PROFILE_PATH = f"~/snap/{self._INSTANCE}/common/.mozilla/firefox/"
self._LIB_PATH = rf"/snap/{self._INSTANCE}/current/usr/lib/firefox/libxul.so"
# This needs to be the snap-based symlink geckodriver to properly setup
# the Snap environment
self._EXE_PATH = rf"/snap/bin/{self._INSTANCE}.geckodriver"
# This needs to be the full path to the binary because at the moment of
# its execution it will already be under the Snap environment, and
# running "snap" command in this context will fail with Permission denied
#
# This can be trivially verified by the following shell script being used
# as binary_location/_BIN_PATH below:
#
# 8<-8<-8<-8<-8<-8<-8<-8<-8<-8<-8<-8<-
# #!/bin/sh
# TMPDIR=${TMPDIR:-/tmp}
# env > $TMPDIR/snap-test.txt
# 8<-8<-8<-8<-8<-8<-8<-8<-8<-8<-8<-8<-
#
# One should see output generated in the instance-specific temp dir
# /tmp/snap-private-tmp/snap.{}/tmp/snap-test.txt
# denoting that everything properly runs under Snap as expected.
self._BIN_PATH = rf"/snap/{self._INSTANCE}/current/usr/lib/firefox/firefox"
snap_profile_path = tempfile.mkdtemp(
prefix="snap-tests",
dir=os.path.expanduser(self._PROFILE_PATH),
)
driver_service_args = []
if self.need_allow_system_access():
driver_service_args += ["--allow-system-access"]
driver_service = Service(
executable_path=self._EXE_PATH,
log_output=os.path.join(
os.environ.get("ARTIFACT_DIR", ""), "geckodriver.log"
),
service_args=driver_service_args,
)
options = Options()
if "TEST_GECKODRIVER_TRACE" in os.environ.keys():
options.log.level = "trace"
options.binary_location = self._BIN_PATH
if not "TEST_NO_HEADLESS" in os.environ.keys():
options.add_argument("--headless")
if "MOZ_AUTOMATION" in os.environ.keys():
os.environ["MOZ_LOG_FILE"] = os.path.join(
os.environ.get("ARTIFACT_DIR"), "gecko.log"
)
options.add_argument("-profile")
options.add_argument(snap_profile_path)
self._driver = webdriver.Firefox(service=driver_service, options=options)
self._logger = structuredlog.StructuredLogger(self.__class__.__name__)
self._logger.add_handler(
handlers.StreamHandler(sys.stdout, formatters.TbplFormatter())
)
test_filter = "test_{}".format(os.environ.get("TEST_FILTER", ""))
object_methods = [
method_name
for method_name in dir(self)
if callable(getattr(self, method_name))
and method_name.startswith(test_filter)
]
self._logger.suite_start(object_methods)
assert self._dir is not None
self._update_channel = None
self._version_major = None
self._snap_core_base = None
self._is_debug_build = None
if self.is_debug_build():
self._logger.info("Running against a DEBUG build")
else:
self._logger.info("Running against a OPT build")
self._driver.maximize_window()
self._wait = WebDriverWait(self._driver, self.get_timeout())
self._longwait = WebDriverWait(self._driver, 60)
with open(exp) as j:
self._expectations = json.load(j)
# exit code ; will be set to 1 at first assertion failure
ec = 0
first_tab = self._driver.window_handles[0]
channel = self.update_channel()
if self.is_esr_128():
channel = "esr-128"
core_base = self.snap_core_base()
channel_and_core = f"{channel}core{core_base}"
self._logger.info(f"Channel & Core: {channel_and_core}")
for m in object_methods:
tabs_before = set()
tabs_after = set()
self._logger.test_start(m)
expectations = (
self._expectations[m]
if not channel_and_core in self._expectations[m]
else self._expectations[m][channel_and_core]
)
self._driver.switch_to.window(first_tab)
try:
tabs_before = set(self._driver.window_handles)
rv = getattr(self, m)(expectations)
assert rv is not None, "test returned no value"
tabs_after = set(self._driver.window_handles)
self._logger.info(f"tabs_after OK {tabs_after}")
self._driver.switch_to.parent_frame()
if rv:
self._logger.test_end(m, status="OK")
else:
self._logger.test_end(m, status="FAIL")
except Exception as ex:
ec = 1
test_status = "ERROR"
if isinstance(ex, AssertionError):
test_status = "FAIL"
elif isinstance(ex, TimeoutException):
test_status = "TIMEOUT"
test_message = repr(ex)
self.save_screenshot(
f"screenshot_{m.lower()}_{test_status.lower()}.png"
)
self._driver.switch_to.parent_frame()
self.save_screenshot(
f"screenshot_{m.lower()}_{test_status.lower()}_parent.png"
)
self._logger.test_end(m, status=test_status, message=test_message)
traceback.print_exc()
tabs_after = set(self._driver.window_handles)
self._logger.info(f"tabs_after EXCEPTION {tabs_after}")
finally:
self._logger.info(f"tabs_before {tabs_before}")
tabs_opened = tabs_after - tabs_before
self._logger.info(f"opened {len(tabs_opened)} tabs")
self._logger.info(f"opened {tabs_opened} tabs")
closed = 0
for tab in tabs_opened:
self._logger.info(f"switch to {tab}")
self._driver.switch_to.window(tab)
self._logger.info(f"close {tab}")
self._driver.close()
closed += 1
self._logger.info(
f"wait EC.number_of_windows_to_be({len(tabs_after) - closed})"
)
self._wait.until(
EC.number_of_windows_to_be(len(tabs_after) - closed)
)
self._driver.switch_to.window(first_tab)
if not "TEST_NO_QUIT" in os.environ.keys():
self._driver.quit()
self._logger.info(f"Exiting with {ec}")
self._logger.suite_end()
sys.exit(ec)
def get_screenshot_destination(self, name):
final_name = name
if "MOZ_AUTOMATION" in os.environ.keys():
final_name = os.path.join(os.environ.get("ARTIFACT_DIR"), name)
return final_name
def save_screenshot(self, name):
final_name = self.get_screenshot_destination(name)
self._logger.info(f"Saving screenshot '{name}' to '{final_name}'")
try:
self._driver.save_screenshot(final_name)
except WebDriverException as ex:
self._logger.info(f"Saving screenshot FAILED due to {ex}")
def get_timeout(self):
if "TEST_TIMEOUT" in os.environ.keys():
return int(os.getenv("TEST_TIMEOUT"))
else:
return 5
def maybe_collect_reference(self):
return "TEST_COLLECT_REFERENCE" in os.environ.keys()
def open_tab(self, url):
opened_tabs = len(self._driver.window_handles)
self._driver.switch_to.new_window("tab")
self._wait.until(EC.number_of_windows_to_be(opened_tabs + 1))
self._driver.get(url)
return self._driver.current_window_handle
def is_debug_build(self):
if self._is_debug_build is None:
self._is_debug_build = (
"with debug_info"
in subprocess.check_output(["file", self._LIB_PATH]).decode()
)
return self._is_debug_build
def need_allow_system_access(self):
geckodriver_output = subprocess.check_output(
[self._EXE_PATH, "--help"]
).decode()
return "--allow-system-access" in geckodriver_output
def update_channel(self):
if self._update_channel is None:
self._driver.set_context("chrome")
self._update_channel = self._driver.execute_script(
"return Services.prefs.getStringPref('app.update.channel');"
)
self._logger.info(f"Update channel: {self._update_channel}")
self._driver.set_context("content")
return self._update_channel
def snap_core_base(self):
if self._snap_core_base is None:
self._driver.set_context("chrome")
self._snap_core_base = self._driver.execute_script(
"return Services.sysinfo.getProperty('distroVersion');"
)
self._logger.info(f"Snap Core: {self._snap_core_base}")
self._driver.set_context("content")
return self._snap_core_base
def version(self):
self._driver.set_context("chrome")
version = self._driver.execute_script("return AppConstants.MOZ_APP_VERSION;")
self._driver.set_context("content")
return version
def version_major(self):
if self._version_major is None:
self._driver.set_context("chrome")
self._version_major = self._driver.execute_script(
"return AppConstants.MOZ_APP_VERSION.split('.')[0];"
)
self._logger.info(f"Version major: {self._version_major}")
self._driver.set_context("content")
return self._version_major
def is_esr_128(self):
return self.update_channel() == "esr" and self.version_major() == "128"
def assert_rendering(self, exp, element_or_driver):
# wait a bit for things to settle down
time.sleep(0.5)
# Convert as RGB otherwise we cannot get difference
png_bytes = (
element_or_driver.screenshot_as_png
if isinstance(element_or_driver, WebElement)
else (
element_or_driver.get_screenshot_as_png()
if isinstance(element_or_driver, WebDriver)
else base64.b64decode(element_or_driver)
)
)
svg_png = Image.open(io.BytesIO(png_bytes)).convert("RGB")
svg_png_cropped = svg_png.crop((0, 35, svg_png.width - 20, svg_png.height - 10))
if self.maybe_collect_reference():
new_ref = "new_{}".format(exp["reference"])
new_ref_file = self.get_screenshot_destination(new_ref)
self._logger.info(
f"Collecting new reference screenshot: {new_ref} => {new_ref_file}"
)
with open(new_ref_file, "wb") as current_screenshot:
svg_png_cropped.save(current_screenshot)
return
svg_ref = Image.open(os.path.join(self._dir, exp["reference"])).convert("RGB")
diff = ImageChops.difference(svg_ref, svg_png_cropped)
bbox = diff.getbbox()
mismatch = 0
try:
mismatch = pixelmatch(
svg_png_cropped, svg_ref, diff, includeAA=False, threshold=0.15
)
if mismatch == 0:
return
self._logger.info(f"Non empty differences from pixelmatch: {mismatch}")
except ValueError as ex:
self._logger.info(f"Problem at pixelmatch: {ex}")
current_rendering_png = "pixelmatch_current_rendering_{}".format(
exp["reference"]
)
with open(
self.get_screenshot_destination(current_rendering_png), "wb"
) as current_screenshot:
svg_png_cropped.save(current_screenshot)
reference_rendering_png = "pixelmatch_reference_rendering_{}".format(
exp["reference"]
)
with open(
self.get_screenshot_destination(reference_rendering_png), "wb"
) as current_screenshot:
svg_ref.save(current_screenshot)
if bbox is not None:
(left, upper, right, lower) = bbox
assert mismatch > 0, "Really mismatching"
bbox_w = right - left
bbox_h = lower - upper
diff_px_on_bbox = round((mismatch * 1.0 / (bbox_w * bbox_h)) * 100, 3)
allowance = exp["allowance"] if "allowance" in exp else 0.15
self._logger.info(
f"Bbox: {bbox_w}x{bbox_h} => {diff_px_on_bbox}% ({allowance}% allowed)"
)
if diff_px_on_bbox <= allowance:
return
(diff_r, diff_g, diff_b) = diff.getextrema()
draw_ref = ImageDraw.Draw(svg_ref)
draw_ref.rectangle(bbox, outline="red")
draw_rend = ImageDraw.Draw(svg_png_cropped)
draw_rend.rectangle(bbox, outline="red")
draw_diff = ImageDraw.Draw(diff)
draw_diff.rectangle(bbox, outline="red")
# Some differences have been found, let's verify
self._logger.info(f"Non empty differences bbox: {bbox}")
buffered = io.BytesIO()
diff.save(buffered, format="PNG")
if "TEST_DUMP_DIFF" in os.environ.keys():
diff_b64 = base64.b64encode(buffered.getvalue())
self._logger.info(
"data:image/png;base64,{}".format(diff_b64.decode("utf-8"))
)
differences_png = "differences_{}".format(exp["reference"])
with open(
self.get_screenshot_destination(differences_png), "wb"
) as diff_screenshot:
diff_screenshot.write(buffered.getvalue())
current_rendering_png = "current_rendering_{}".format(exp["reference"])
with open(
self.get_screenshot_destination(current_rendering_png), "wb"
) as current_screenshot:
svg_png_cropped.save(current_screenshot)
reference_rendering_png = "reference_rendering_{}".format(exp["reference"])
with open(
self.get_screenshot_destination(reference_rendering_png), "wb"
) as current_screenshot:
svg_ref.save(current_screenshot)
(left, upper, right, lower) = bbox
assert right >= left, f"Inconsistent boundaries right={right} left={left}"
assert (
lower >= upper
), f"Inconsistent boundaries lower={lower} upper={upper}"
if ((right - left) <= 2) or ((lower - upper) <= 2):
self._logger.info("Difference is a <= 2 pixels band, ignoring")
return
assert (
diff_px_on_bbox <= allowance
), "Mismatching screenshots for {}".format(exp["reference"])
class SnapTests(SnapTestsBase):
def __init__(self, exp):
self._dir = "basic_tests"
super(__class__, self).__init__(exp)
def test_snap_core_base(self, exp):
assert self.snap_core_base() in ["22", "24"], "Core base should be 22 or 24"
return True
def test_about_support(self, exp):
self.open_tab("about:support")
version_box = self._wait.until(
EC.visibility_of_element_located((By.ID, "version-box"))
)
self._wait.until(lambda d: len(version_box.text) > 0)
self._logger.info(f"about:support version: {version_box.text}")
assert version_box.text == exp["version_box"], "version text should match"
distributionid_box = self._wait.until(
EC.visibility_of_element_located((By.ID, "distributionid-box"))
)
self._wait.until(lambda d: len(distributionid_box.text) > 0)
self._logger.info(f"about:support distribution ID: {distributionid_box.text}")
assert (
distributionid_box.text == exp["distribution_id"]
), "distribution_id should match"
windowing_protocol = self._driver.execute_script(
"return document.querySelector('th[data-l10n-id=\"graphics-window-protocol\"').parentNode.lastChild.textContent;"
)
self._logger.info(f"about:support windowing protocol: {windowing_protocol}")
assert windowing_protocol == "wayland", "windowing protocol should be wayland"
return True
def test_about_buildconfig(self, exp):
self.open_tab("about:buildconfig")
source_link = self._wait.until(
EC.visibility_of_element_located((By.CSS_SELECTOR, "a"))
)
self._wait.until(lambda d: len(source_link.text) > 0)
self._logger.info(f"about:buildconfig source: {source_link.text}")
assert source_link.text.startswith(
exp["source_repo"]
), "source repo should exists and match"
build_flags_box = self._wait.until(
EC.visibility_of_element_located((By.CSS_SELECTOR, "p:last-child"))
)
self._wait.until(lambda d: len(build_flags_box.text) > 0)
self._logger.info(f"about:support buildflags: {build_flags_box.text}")
assert (
build_flags_box.text.find(exp["official"]) >= 0
), "official build flag should be there"
return True
def test_youtube(self, exp):
# Skip because unreliable
return True
# Wait so we leave time to breathe and not be classified as a bot
time.sleep(5)
# Wait for the consent dialog and accept it
self._logger.info("Wait for consent form")
try:
self._wait.until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, "button[aria-label*=Accept]")
)
).click()
except TimeoutException:
self._logger.info("Wait for consent form: timed out, maybe it is not here")
# Wait so we leave time to breathe and not be classified as a bot
time.sleep(3)
# Wait for the cable TV dialog and accept it
self._logger.info("Wait for cable proposal")
try:
self._wait.until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, "button[aria-label*=Dismiss]")
)
).click()
except TimeoutException:
self._logger.info(
"Wait for cable proposal: timed out, maybe it is not here"
)
# Wait so we leave time to breathe and not be classified as a bot
time.sleep(3)
# Find first video and click it
self._logger.info("Wait for one video")
self._wait.until(
EC.visibility_of_element_located((By.ID, "video-title-link"))
).click()
# Wait so we leave time to breathe and not be classified as a bot
time.sleep(3)
# Wait for duration to be set to something
self._logger.info("Wait for video to start")
video = None
try:
video = self._longwait.until(
EC.visibility_of_element_located((By.CLASS_NAME, "html5-main-video"))
)
self._longwait.until(
lambda d: type(video.get_property("duration")) is float
)
self._logger.info(
"video duration: {}".format(video.get_property("duration"))
)
assert (
video.get_property("duration") > exp["duration"]
), "youtube video should have duration"
self._wait.until(
lambda d: video.get_property("currentTime") > exp["playback"]
)
self._logger.info(
"video played: {}".format(video.get_property("currentTime"))
)
assert (
video.get_property("currentTime") > exp["playback"]
), "youtube video should perform playback"
except TimeoutException as ex:
self._logger.info("video detection timed out")
self._logger.info(f"video: {video}")
if video:
self._logger.info(
"video duration: {}".format(video.get_property("duration"))
)
raise ex
return True
def wait_for_enable_drm(self):
rv = True
self._driver.set_context("chrome")
self._driver.execute_script(
"Services.prefs.setBoolPref('media.gmp-manager.updateEnabled', true);"
)
enable_drm_button = self._wait.until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, ".notification-button[label='Enable DRM']")
)
)
self._logger.info("Enabling DRMs")
enable_drm_button.click()
self._wait.until(
EC.invisibility_of_element_located(
(By.CSS_SELECTOR, ".notification-button[label='Enable DRM']")
)
)
self._logger.info("Installing DRMs")
self._wait.until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, ".infobar[value='drmContentCDMInstalling']")
)
)
self._logger.info("Waiting for DRMs installation to complete")
self._longwait.until(
EC.invisibility_of_element_located(
(By.CSS_SELECTOR, ".infobar[value='drmContentCDMInstalling']")
)
)
self._driver.set_context("content")
return rv
def test_youtube_film(self, exp):
return True
if not self.wait_for_enable_drm():
self._logger.info("Skipped on ESR because cannot enable DRM")
return True
# Wait for duration to be set to something
self._logger.info("Wait for video to start")
video = self._wait.until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, "video.html5-main-video")
)
)
self._wait.until(lambda d: type(video.get_property("duration")) is float)
self._logger.info("video duration: {}".format(video.get_property("duration")))
assert (
video.get_property("duration") > exp["duration"]
), "youtube video should have duration"
self._driver.execute_script("arguments[0].click();", video)
video.send_keys("k")
self._wait.until(lambda d: video.get_property("currentTime") > exp["playback"])
self._logger.info("video played: {}".format(video.get_property("currentTime")))
assert (
video.get_property("currentTime") > exp["playback"]
), "youtube video should perform playback"
return True
if __name__ == "__main__":
SnapTests(exp=sys.argv[1])