Source code
Revision control
Copy as Markdown
Other Tools
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
"""
Drives the throttling feature when the test calls our
controlled server.
"""
import http.client
import json
import os
import sys
import time
from urllib.parse import urlparse
from mozperftest.test.browsertime import add_option
from mozperftest.utils import get_tc_secret
ENDPOINTS = {
"linux": "h3.dev.mozaws.net",
"darwin": "h3.mac.dev.mozaws.net",
"win32": "h3.win.dev.mozaws.net",
}
CTRL_SERVER = ENDPOINTS[sys.platform]
TASK_CLUSTER = "TASK_ID" in os.environ.keys()
_SECRET = {
"throttler_key": os.environ.get("WEBNETEM_KEY", ""),
}
if TASK_CLUSTER:
_SECRET.update(get_tc_secret())
if _SECRET["throttler_key"] == "":
if TASK_CLUSTER:
raise Exception("throttler_key not found in secret")
raise Exception("WEBNETEM_KEY not set")
_TIMEOUT = 30
WAIT_TIME = 60 * 10
IDLE_TIME = 10
BREATHE_TIME = 20
class Throttler:
def __init__(self, env, host, key):
self.env = env
self.host = host
self.key = key
self.verbose = env.get_arg("verbose", False)
self.logger = self.verbose and self.env.info or self.env.debug
def log(self, msg):
self.logger("[throttler] " + msg)
def _request(self, action, data=None):
kw = {}
headers = {b"X-WEBNETEM-KEY": self.key}
verb = data is None and "GET" or "POST"
if data is not None:
data = json.dumps(data)
headers[b"Content-type"] = b"application/json"
parsed = urlparse(self.host)
server = parsed.netloc
path = parsed.path
if action != "status":
path += "/" + action
self.log(f"Calling {verb} {path}")
conn = http.client.HTTPSConnection(server, timeout=_TIMEOUT)
conn.request(verb, path, body=data, headers=headers, **kw)
resp = conn.getresponse()
res = resp.read()
if resp.status >= 400:
raise Exception(res)
res = json.loads(res)
return res
def start(self, data=None):
self.log("Starting")
now = time.time()
acquired = False
while time.time() - now < WAIT_TIME:
status = self._request("status")
if status.get("test_running"):
# a test is running
self.log("A test is already controlling the server")
self.log(f"Waiting {IDLE_TIME} seconds")
else:
try:
self._request("start_test")
acquired = True
break
except Exception:
# we got beat in the race
self.log("Someone else beat us")
time.sleep(IDLE_TIME)
if not acquired:
raise Exception("Could not acquire the test server")
if data is not None:
self._request("shape", data)
def stop(self):
self.log("Stopping")
try:
self._request("reset")
finally:
self._request("stop_test")
def get_throttler(env):
host = _SECRET["throttler_host"]
key = _SECRET["throttler_key"].encode()
return Throttler(env, host, key)
_PROTOCOL = "h2", "h3"
_PAGE = "gallery", "news", "shopping", "photoblog"
# set the network condition here.
# each item has a name and some netem options:
#
# loss_ratio: specify percentage of packets that will be lost
# loss_corr: specify a correlation factor for the random packet loss
# dup_ratio: specify percentage of packets that will be duplicated
# delay: specify an overall delay for each packet
# jitter: specify amount of jitter in milliseconds
# delay_jitter_corr: specify a correlation factor for the random jitter
# reorder_ratio: specify percentage of packets that will be reordered
# reorder_corr: specify a correlation factor for the random reordering
#
_THROTTLING = (
{"name": "full"}, # no throttling.
{"name": "one", "delay": "20"},
{"name": "two", "delay": "50"},
{"name": "three", "delay": "100"},
{"name": "four", "delay": "200"},
{"name": "five", "delay": "300"},
)
def get_test():
"""Iterate on test conditions.
For each cycle, we return a combination of: protocol, page, throttling
settings. Each combination has a name, and that name will be used along with
the protocol as a prefix for each metrics.
"""
for proto in _PROTOCOL:
for page in _PAGE:
for throttler_settings in _THROTTLING:
yield proto, page, url, throttler_settings
combo = get_test()
def before_cycle(metadata, env, cycle, script):
global combo
if "throttlable" not in script["tags"]:
return
throttler = get_throttler(env)
try:
proto, page, url, throttler_settings = next(combo)
except StopIteration:
combo = get_test()
proto, page, url, throttler_settings = next(combo)
# setting the url for the browsertime script
add_option(env, "browsertime.url", url, overwrite=True)
# enabling http if needed
if proto == "h3":
add_option(env, "firefox.preference", "network.http.http3.enable:true")
# prefix used to differenciate metrics
name = throttler_settings["name"]
script["name"] = f"{name}_{proto}_{page}"
# throttling the controlled server if needed
if throttler_settings != {"name": "full"}:
env.info("Calling the controlled server")
throttler.start(throttler_settings)
else:
env.info("No throttling for this call")
throttler.start()
def after_cycle(metadata, env, cycle, script):
if "throttlable" not in script["tags"]:
return
throttler = get_throttler(env)
try:
throttler.stop()
except Exception:
pass
# give a chance for a competitive job to take over
time.sleep(BREATHE_TIME)