Source code
Revision control
Copy as Markdown
Other Tools
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
import argparse
import ast
import base64
import dataclasses
import hashlib
import itertools
import os
import random
import re
import subprocess
import sys
import threading
EXTERNAL_PSK = "0x783666676F55306932745A32303354442B394A3271735A7A30714B464B645943"
ECH_CONFIGS = "AEX+DQBBcQAgACDh4IuiuhhInUcKZx5uYcehlG9PQ1ZlzhvVZyjJl7dscQAEAAEAAQASY2xvdWRmbGFyZS1lY2guY29tAAA="
DEFAULT_TSTCLNT_ARGS = [
"-o", # Override bad server cert. Make it OK.
"-D", # Run without a cert database
"-Q", # Quit after handshake
"-b", # Load the default "builtins" root CA module
"-CCC", # Include PEM format certificate dumps
]
NS_CERT_HEADER = "-----BEGIN CERTIFICATE-----"
NS_CERT_TRAILER = "-----END CERTIFICATE-----"
@dataclasses.dataclass
class HandshakeData:
client = b""
server = b""
certificates = []
def parse_strace_line(line):
match = re.search(r"\"(\\x[a-f0-9]{2})+\"", line)
if match is None:
return b""
data = ast.literal_eval("b" + match.group(0))
return data
def parse_tstclnt_output(output):
hs_data = HandshakeData()
certificate = False
for line in output.splitlines():
if line.startswith("sendto("):
hs_data.client += parse_strace_line(line)
continue
if line.startswith("recvfrom(") and hs_data.client:
hs_data.server += parse_strace_line(line)
continue
if line == NS_CERT_HEADER:
certificate = ""
continue
if line == NS_CERT_TRAILER:
hs_data.certificates.append(base64.b64decode(certificate))
certificate = False
continue
# Check if we are currently in a certificate block by abusing
# ✦.✧̣̇˚. dynamic typing ˚.✦⋆.
if isinstance(certificate, str):
certificate += line
continue
return hs_data
def get_random_tstclnt_args():
tstclnt_args = []
# Use Encrypted Client Hello with the given Base64-encoded ECHConfigs.
if random.randint(0, 1):
tstclnt_args += ["-N", ECH_CONFIGS]
# Configure a TLS 1.3 External PSK with the given hex string for a key.
if random.randint(0, 1):
tstclnt_args += ["-z", EXTERNAL_PSK]
# Enable the session ticket extension.
if random.randint(0, 1):
tstclnt_args += ["-u"]
# Enable the signed_certificate_timestamp extension.
if random.randint(0, 1):
tstclnt_args += ["-U"]
# Enable the delegated credentials extension.
if random.randint(0, 1):
tstclnt_args += ["-B"]
# Enable the extended master secret extension [RFC7627].
if random.randint(0, 1):
tstclnt_args += ["-G"]
# Allow 0-RTT data (TLS 1.3 only).
if random.randint(0, 1):
tstclnt_args += ["-Z"]
# Enable middlebox compatibility mode (TLS 1.3 only).
if random.randint(0, 1):
tstclnt_args += ["-e"]
if random.randint(0, 1):
tstclnt_args += ["--enable-rfc8701-grease"]
if random.randint(0, 1):
tstclnt_args += ["--enable-ch-extension-permutation"]
if random.randint(0, 1):
tstclnt_args += ["--zlib-certificate-compression"]
return tstclnt_args
def brrrrr(hosts, args):
tstclnt_bin = os.path.join(args.nss_build, "bin/tstclnt")
ld_libary_path = os.path.join(args.nss_build, "lib")
for host in hosts:
tstclnt_args = get_random_tstclnt_args()
try:
result = subprocess.run([
"strace", "-f", "-x", "-s", "65535", "-e", "trace=network",
tstclnt_bin, "-h", host
] + DEFAULT_TSTCLNT_ARGS + tstclnt_args,
env={
"LD_LIBRARY_PATH": ld_libary_path,
},
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=2,
text=True)
except subprocess.TimeoutExpired:
print("Getting handshake timed out for:", host, file=sys.stderr)
continue
hs_data = parse_tstclnt_output(result.stderr)
if hs_data.client:
filename = hashlib.sha1(hs_data.client).hexdigest()
filepath = os.path.join(args.output, "tls-server-corpus", filename)
with open(filepath, "wb") as f:
f.write(hs_data.client)
if hs_data.server:
filename = hashlib.sha1(hs_data.server).hexdigest()
filepath = os.path.join(args.output, "tls-client-corpus", filename)
with open(filepath, "wb") as f:
f.write(hs_data.server)
for certificate in hs_data.certificates:
filename = hashlib.sha1(certificate).hexdigest()
filepath = os.path.join(args.output, "quickder-corpus", filename)
with open(filepath, "wb") as f:
f.write(certificate)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--nss-build",
required=True,
type=str,
help="e.g. /path/to/dist/Debug")
parser.add_argument("--hosts", required=True, type=str)
parser.add_argument("--threads", required=True, type=int)
parser.add_argument("--output", required=True, type=str)
args = parser.parse_args()
with open(args.hosts, "r") as f:
hosts = f.read().splitlines()
# For use in automation (e.g. MozillaSecurity/orion), the output
# corpus directories should follow the following scheme: $name-corpus.
os.makedirs(os.path.join(args.output, "quickder-corpus"), exist_ok=True)
os.makedirs(os.path.join(args.output, "tls-server-corpus"), exist_ok=True)
os.makedirs(os.path.join(args.output, "tls-client-corpus"), exist_ok=True)
batches = itertools.batched(hosts, len(hosts) // args.threads)
threads = []
while batch := next(batches, None):
thread = threading.Thread(target=brrrrr, args=(
batch,
args,
))
thread.daemon = True
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
if __name__ == "__main__":
main()