Source code
Revision control
Copy as Markdown
Other Tools
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
import json
import os
import re
import signal
import socket
import subprocess
import tempfile
import threading
from pathlib import Path
from subprocess import PIPE, Popen
import filters
from base_python_support import BasePythonSupport
from logger.logger import RaptorLogger
LOG = RaptorLogger(component="raptor-browsertime")
class NetworkBench(BasePythonSupport):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._is_chrome = False
self.browsertime_node = None
self.backend_server = None
self.backend_port = None
self.caddy_port = None
self.caddy_server = None
self.caddy_stdout = None
self.caddy_stderr = None
self.http_version = "h1"
self.transfer_type = "download"
# loopback
self.interface = "lo"
self.network_type = "unthrottled"
self.packet_loss_rate = None
self.cleanup = []
def setup_test(self, test, args):
from cmdline import CHROME_ANDROID_APPS, CHROMIUM_DISTROS
LOG.info("setup_test: '%s'" % test)
self._is_chrome = (
args.app in CHROMIUM_DISTROS or args.app in CHROME_ANDROID_APPS
)
test_name = test.get("name").split("-", 2)
self.http_version = test_name[0] if test_name[0] in ["h3", "h2"] else "unknown"
self.transfer_type = (
test_name[1] if test_name[1] in ["download", "upload"] else "unknown"
)
LOG.info(f"http_version: '{self.http_version}', type: '{self.transfer_type}'")
if self.http_version == "unknown" or self.transfer_type == "unknown":
raise Exception("Unsupported test")
def check_caddy_installed(self):
try:
result = subprocess.run(
["caddy", "version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode == 0:
LOG.info("Caddy is installed. Version: %s" % result.stdout.strip())
return True
else:
LOG.error("Caddy is not installed.")
except FileNotFoundError:
LOG.error("Caddy is not installed.")
return False
def start_backend_server(self, path):
if self.browsertime_node is None or not self.browsertime_node.exists():
return None
LOG.info("node bin: %s" % self.browsertime_node)
server_path = (
Path(__file__).parent / ".." / ".." / "browsertime" / "utils" / path
)
LOG.info("server_path: %s" % server_path)
if not server_path.exists():
return None
process = Popen(
[self.browsertime_node, server_path],
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
universal_newlines=True,
start_new_session=True,
)
msg = process.stdout.readline()
LOG.info("server msg: %s" % msg)
if match:
self.backend_port = match.group(1)
LOG.info("backend port: %s" % self.backend_port)
return process
return None
def find_free_port(self, socket_type):
with socket.socket(socket.AF_INET, socket_type) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("localhost", 0))
return s.getsockname()[1]
def start_caddy(self, test_file_path):
if not self.check_caddy_installed():
return None
if self.caddy_port is None or not (1 <= self.caddy_port <= 65535):
return None
utils_path = Path(__file__).parent / ".." / ".." / "browsertime" / "utils"
if not utils_path.exists():
return None
key_path = utils_path / "http2-cert.key"
LOG.info("key_path: %s" % key_path)
if not key_path.exists():
return None
pem_path = utils_path / "http2-cert.pem"
LOG.info("pem_path: %s" % pem_path)
if not pem_path.exists():
return None
port_str = f":{self.caddy_port}"
if self.transfer_type == "upload":
upstream = f"localhost:{self.backend_port}"
routes = [
{
"handle": [
{
"handler": "reverse_proxy",
"upstreams": [{"dial": upstream}],
}
]
}
]
elif self.transfer_type == "download":
fetches_dir = Path(os.environ.get("MOZ_FETCHES_DIR", "None"))
if not fetches_dir.exists():
return None
def generate_download_test_html(fetches_dir, test_file_name):
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>Download test</title>
</head>
<body>
<section>Download test</section>
<button id='downloadBtn'>Download Test</button>
<p id='download_status'></p>
<script>
let download_status = '';
function set_status(status) {{
download_status = status;
console.log('download_status:' + status);
document.getElementById('download_status').textContent = status;
}}
set_status('not_started');
const handleDownloadTest = () => {{
set_status('started');
const startTime = performance.now();
console.log('start');
fetch('/{test_file_name}')
.then((response) => response.blob())
.then((_) => {{
console.log('done');
const endTime = performance.now();
const downloadTime = endTime - startTime;
set_status('success time:' + downloadTime);
}})
.catch((error) => {{
console.error(error);
set_status('error');
}});
}};
document
.querySelector('#downloadBtn')
.addEventListener('click', handleDownloadTest);
</script>
</body>
</html>
""".format(
test_file_name=test_file_name
)
html_file_path = fetches_dir / "download_test.html"
with html_file_path.open("w", encoding="utf-8") as html_file:
html_file.write(html_content)
self.cleanup.append(lambda: html_file_path.unlink())
generate_download_test_html(fetches_dir, test_file_path.name)
routes = [
{
"match": [{"path": ["/download_test.html"]}],
"handle": [
{
"handler": "file_server",
"root": str(fetches_dir),
"browse": {},
}
],
},
{
"match": [{"path": [f"/{test_file_path.name}"]}],
"handle": [
{
"handler": "headers",
"response": {
"set": {
"Cache-Control": [
"no-store",
"no-cache",
"must-revalidate",
"proxy-revalidate",
"max-age=0",
],
"Pragma": ["no-cache"],
"Expires": ["0"],
}
},
},
{
"handler": "file_server",
"root": str(fetches_dir),
"browse": {},
},
],
},
]
caddyfile_content = {
"admin": {"disabled": True},
"apps": {
"http": {
"servers": {
"server1": {
"listen": [port_str],
"protocols": ["h3"],
"routes": routes,
"tls_connection_policies": [
{"certificate_selection": {"any_tag": ["cert1"]}}
],
"automatic_https": {"disable": True},
}
},
},
"tls": {
"certificates": {
"load_files": [
{
"certificate": str(pem_path),
"key": str(key_path),
"tags": ["cert1"],
}
]
}
},
},
}
LOG.info("caddyfile_content: %s" % caddyfile_content)
with tempfile.NamedTemporaryFile(
mode="w", delete=False, suffix=".json"
) as temp_json_file:
json.dump(caddyfile_content, temp_json_file, indent=2)
temp_json_file_path = temp_json_file.name
LOG.info("temp_json_file_path: %s" % temp_json_file_path)
command = ["caddy", "run", "--config", temp_json_file_path]
def read_output(pipe, log_func):
for line in iter(pipe.readline, ""):
log_func(line)
process = Popen(
command,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
universal_newlines=True,
start_new_session=True,
)
self.caddy_stdout = threading.Thread(
target=read_output, args=(process.stdout, LOG.info)
)
self.caddy_stderr = threading.Thread(
target=read_output, args=(process.stderr, LOG.info)
)
self.caddy_stdout.start()
self.caddy_stderr.start()
return process
def check_tc_command(self):
try:
result = subprocess.run(
["sudo", "tc", "-help"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode == 0:
LOG.info("tc can be executed as root")
return True
else:
LOG.error("tc is not available")
except Exception as e:
LOG.error(f"Error executing tc: {str(e)}")
return False
def run_command(self, command):
try:
subprocess.run(
command,
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
LOG.info(command)
return True
except subprocess.CalledProcessError as e:
LOG.info(f"Error executing command: {command}")
LOG.info(f"Error: {e.stderr.decode()}")
return False
def network_type_to_bandwidth_rtt(self, network_type):
# Define the mapping of network types
network_mapping = {
"1M_400ms": {"bandwidth": "1Mbit", "rtt_ms": 400},
"300M_40ms": {"bandwidth": "300Mbit", "rtt_ms": 40},
"300M_80ms": {"bandwidth": "300Mbit", "rtt_ms": 80},
"10M_40ms": {"bandwidth": "10Mbit", "rtt_ms": 40},
"100M_40ms": {"bandwidth": "100Mbit", "rtt_ms": 40},
}
# Look up the network type in the mapping
result = network_mapping.get(network_type)
if result is None:
raise Exception(f"Unknown network type: {network_type}")
return result["bandwidth"], result["rtt_ms"]
def apply_network_throttling(self, interface, network_type, loss, port):
def calculate_bdp(bandwidth_mbit, rtt_ms):
bandwidth_kbps = bandwidth_mbit * 1_000
bdp_bits = bandwidth_kbps * rtt_ms
bdp_bytes = bdp_bits / 8
if bdp_bytes < 1500:
bdp_bytes = 1500
return int(bdp_bytes)
bandwidth_str, rtt_ms = self.network_type_to_bandwidth_rtt(network_type)
# The delay used in netem is applied before sending,
# so the delay value should be rtt_ms / 2.
delay_ms = rtt_ms / 2
bandwidth_mbit = float(bandwidth_str.replace("Mbit", ""))
bdp_bytes = calculate_bdp(bandwidth_mbit, rtt_ms)
LOG.info(
f"apply_network_throttling: bandwidth={bandwidth_str} delay={delay_ms}ms loss={loss}"
)
self.run_command(f"sudo tc qdisc del dev {interface} root")
if not self.run_command(
f"sudo tc qdisc add dev {interface} root handle 1: htb default 12"
):
return False
else:
LOG.info("Register cleanup function")
self.cleanup.append(
lambda: self.run_command(f"sudo tc qdisc del dev {self.interface} root")
)
if not self.run_command(
f"sudo tc class add dev {interface} parent 1: classid 1:1 htb rate {bandwidth_str}"
):
return False
delay_str = f"{delay_ms}ms"
if not loss or loss == "0":
if not self.run_command(
f"sudo tc qdisc add dev {interface} parent 1:1 handle 10: netem delay {delay_str} limit {bdp_bytes}"
):
return False
elif not self.run_command(
f"sudo tc qdisc add dev {interface} parent 1:1 handle 10: netem delay {delay_str} loss {loss}% limit {bdp_bytes}"
):
return False
# Add a filter to match UDP traffic on the specified port for IPv4
if not self.run_command(
f"sudo tc filter add dev {interface} protocol ip parent 1:0 u32 "
f"match ip protocol 17 0xff "
f"match ip dport {port} 0xffff "
f"flowid 1:1"
):
return False
if not self.run_command(
f"sudo tc filter add dev {interface} protocol ip parent 1:0 u32 "
f"match ip protocol 17 0xff "
f"match ip sport {port} 0xffff "
f"flowid 1:1"
):
return False
# Add a filter to match UDP traffic on the specified port for IPv6
if not self.run_command(
f"sudo tc filter add dev {interface} parent 1:0 protocol ipv6 u32 "
f"match ip6 protocol 17 0xff "
f"match ip6 dport {port} 0xffff "
f"flowid 1:1"
):
return False
if not self.run_command(
f"sudo tc filter add dev {interface} parent 1:0 protocol ipv6 u32 "
f"match ip6 protocol 17 0xff "
f"match ip6 sport {port} 0xffff "
f"flowid 1:1"
):
return False
return True
def get_network_conditions(self, cmd):
try:
i = 0
while i < len(cmd):
if cmd[i] == "--network_type":
self.network_type = cmd[i + 1]
i += 2
elif cmd[i] == "--pkt_loss_rate":
self.packet_loss_rate = cmd[i + 1]
i += 2
else:
i += 1
except Exception:
raise Exception("failed to get network condition")
def network_type_to_temp_file(self, network_type):
def calculate_file_size(bandwidth_mbps):
time_seconds = 60
# Calculate transfer size in bits
transfer_size_bits = bandwidth_mbps * 1e6 * time_seconds
transfer_size_bytes = int(transfer_size_bits / 8)
return transfer_size_bytes
def generate_temp_file(file_size_in_bytes, target_dir):
# Create a unique file name with the size in MB
temp_file_path = target_dir / f"temp_file_{file_size_in_bytes}B.bin"
try:
with open(temp_file_path, "wb") as temp_file:
# Write random data to the file
temp_file.write(os.urandom(file_size_in_bytes))
LOG.info(f"Temporary file created in MOZ_FETCHES_DIR: {temp_file_path}")
return temp_file_path
except Exception as e:
LOG.error(f"Failed to create temporary file: {e}")
return None
fetches_dir = Path(os.environ.get("MOZ_FETCHES_DIR", "None"))
if not fetches_dir.exists():
raise Exception("Missing MOZ_FETCHES_DIR")
if network_type == "unthrottled":
bandwidth_str = "1000Mbit"
else:
bandwidth_str, _ = self.network_type_to_bandwidth_rtt(network_type)
bandwidth_mbit = float(bandwidth_str.replace("Mbit", ""))
file_size = calculate_file_size(bandwidth_mbit)
temp_file_path = generate_temp_file(file_size, fetches_dir)
return temp_file_path, file_size
def modify_command(self, cmd, test):
if not self._is_chrome:
cmd += [
"--firefox.acceptInsecureCerts",
"true",
]
if self.http_version == "h3":
self.caddy_port = self.find_free_port(socket.SOCK_DGRAM)
if not self._is_chrome:
cmd += [
"--firefox.preference",
f"network.http.http3.alt-svc-mapping-for-testing:localhost;h3=:{self.caddy_port}",
"--firefox.preference",
"network.http.http3.force-use-alt-svc-mapping-for-testing:true",
"--firefox.preference",
"network.http.http3.disable_when_third_party_roots_found:false",
]
else:
spki = "VCIlmPM9NkgFQtrs4Oa5TeFcDu6MWRTKSNdePEhOgD8="
cmd += [
f"--chrome.args=--origin-to-force-quic-on=localhost:{self.caddy_port}",
f"--chrome.args=--ignore-certificate-errors-spki-list={spki}",
]
else:
self.caddy_port = self.find_free_port(socket.SOCK_STREAM)
self.get_network_conditions(cmd)
temp_file_path, file_size = self.network_type_to_temp_file(self.network_type)
if not temp_file_path:
raise Exception("Failed to generate temporary file")
self.cleanup.append(lambda: temp_file_path.unlink())
if self.transfer_type == "upload":
cmd += [
"--browsertime.server_url",
"--browsertime.test_file_name",
temp_file_path.name,
"--browsertime.test_file_size",
str(file_size),
]
elif self.transfer_type == "download":
cmd += [
"--browsertime.server_url",
"--browsertime.test_file_size",
str(file_size),
]
LOG.info("modify_command: %s" % cmd)
# We know that cmd[0] is the path to nodejs.
self.browsertime_node = Path(cmd[0])
self.backend_server = self.start_backend_server("benchmark_backend_server.js")
if self.backend_server:
self.caddy_server = self.start_caddy(temp_file_path)
if self.caddy_server is None:
raise Exception("Failed to start test servers")
if self.network_type != "unthrottled":
if not self.check_tc_command():
raise Exception("tc is not available")
if not self.apply_network_throttling(
self.interface,
self.network_type,
self.packet_loss_rate,
self.caddy_port,
):
raise Exception("apply_network_throttling failed")
def handle_result(self, bt_result, raw_result, last_result=False, **kwargs):
bandwidth_key = (
"upload-bandwidth"
if self.transfer_type == "upload"
else "download-bandwidth"
)
def get_bandwidth(data):
try:
extras = data.get("extras", [])
if extras and isinstance(extras, list):
custom_data = extras[0].get("custom_data", {})
if bandwidth_key in custom_data:
return custom_data[bandwidth_key]
return None # Return None if any key or index is missing
except Exception:
return None
bandwidth = get_bandwidth(raw_result)
if not bandwidth:
return
LOG.info(f"Bandwidth: {bandwidth}")
for b in bandwidth:
bt_result["measurements"].setdefault(bandwidth_key, []).append(b)
def _build_subtest(self, measurement_name, replicates, test):
unit = test.get("unit", "Mbit/s")
if test.get("subtest_unit"):
unit = test.get("subtest_unit")
return {
"name": measurement_name,
"lowerIsBetter": test.get("lower_is_better", False),
"alertThreshold": float(test.get("alert_threshold", 2.0)),
"unit": unit,
"replicates": replicates,
"shouldAlert": False,
"value": round(filters.mean(replicates), 3),
}
def summarize_test(self, test, suite, **kwargs):
suite["type"] = "benchmark"
if suite["subtests"] == {}:
suite["subtests"] = []
for measurement_name, replicates in test["measurements"].items():
if not replicates:
continue
suite["subtests"].append(
self._build_subtest(measurement_name, replicates, test)
)
suite["subtests"].sort(key=lambda subtest: subtest["name"])
def summarize_suites(self, suites):
for index, item in enumerate(suites):
if "extraOptions" in item:
item["extraOptions"].append(self.network_type)
loss_str = (
f"loss-{self.packet_loss_rate}"
if self.packet_loss_rate
else "loss-0"
)
item["extraOptions"].append(loss_str)
def shutdown_server(self, name, proc):
LOG.info("%s server shutting down ..." % name)
if proc.poll() is not None:
LOG.info("server already dead %s" % proc.poll())
else:
LOG.info("server pid is %s" % str(proc.pid))
try:
os.killpg(proc.pid, signal.SIGTERM)
except Exception as e:
LOG.error("Failed during kill: " + str(e))
def clean_up(self):
if self.caddy_server:
self.shutdown_server("Caddy", self.caddy_server)
if self.backend_server:
self.shutdown_server("Backend", self.backend_server)
if self.caddy_stdout:
self.caddy_stdout.join()
if self.caddy_stderr:
self.caddy_stderr.join()
while self.cleanup:
func = self.cleanup.pop()
func()