Revision control

Copy as Markdown

Other Tools

#!/usr/bin/env python
import binascii
import argparse
import re
import subprocess
import sys
import os.path
import logging
import time
from collections import OrderedDict
import multiprocessing
from multiprocessing.pool import ThreadPool
SUPPORTED_ALGORITHMS = {
"AES-128/CFB": "aes-128-cfb",
"AES-192/CFB": "aes-192-cfb",
"AES-256/CFB": "aes-256-cfb",
"AES-128/GCM": "aes-128-gcm",
"AES-192/GCM": "aes-192-gcm",
"AES-256/GCM": "aes-256-gcm",
"AES-128/OCB": "aes-128-ocb",
"AES-128/XTS": "aes-128-xts",
"AES-256/XTS": "aes-256-xts",
"ChaCha20Poly1305": "chacha20poly1305",
}
class VecDocument:
def __init__(self, filepath):
self.data = OrderedDict()
last_testcase_number = 1
current_testcase_number = 1
current_group_name = ""
last_group_name = ""
current_testcase = {}
PATTERN_GROUPHEADER = "^\[(.+)\]$"
PATTERN_KEYVALUE = "^\s*([a-zA-Z]+)\s*=(.*)$"
with open(filepath, 'r') as f:
# Append one empty line to simplify parsing
lines = f.read().splitlines() + ["\n"]
for line in lines:
line = line.strip()
if line.startswith("#"):
pass # Skip
elif line == "":
current_testcase_number += 1
elif re.match(PATTERN_GROUPHEADER, line):
match = re.match(PATTERN_GROUPHEADER, line)
current_group_name = match.group(1)
elif re.match(PATTERN_KEYVALUE, line):
match = re.match(PATTERN_KEYVALUE, line)
key = match.group(1)
value = match.group(2).strip()
current_testcase[key] = value
if current_testcase_number != last_testcase_number:
if not current_group_name in self.data:
self.data[current_group_name] = []
if len(current_testcase) != 0:
self.data[current_group_name].append(current_testcase)
current_testcase = {}
last_testcase_number = current_testcase_number
if current_group_name != last_group_name:
last_group_name = current_group_name
# Reset testcase number
last_testcase_number = 1
current_testcase_number = 1
def get_data(self):
return self.data
TESTS_RUN = 0
TESTS_FAILED = 0
class TestLogHandler(logging.StreamHandler, object):
def emit(self, record):
# Do the default stuff first
super(TestLogHandler, self).emit(record)
if record.levelno >= logging.ERROR:
global TESTS_FAILED
TESTS_FAILED += 1
def setup_logging(options):
if options.verbose:
log_level = logging.DEBUG
elif options.quiet:
log_level = logging.WARNING
else:
log_level = logging.INFO
lh = TestLogHandler(sys.stdout)
lh.setFormatter(logging.Formatter('%(levelname) 7s: %(message)s'))
logging.getLogger().addHandler(lh)
logging.getLogger().setLevel(log_level)
def test_cipher_kat(cli_binary, data):
iv = data['Nonce']
key = data['Key']
ad = data['AD'] if 'AD' in data else ""
plaintext = data['In'].lower()
ciphertext = data['Out'].lower()
algorithm = data['Algorithm']
direction = data['Direction']
mode = SUPPORTED_ALGORITHMS.get(algorithm)
if mode is None:
raise Exception("Unknown algorithm: '" + algorithm + "'")
cmd = [
cli_binary,
"encryption",
"--mode=%s" % mode,
"--iv=%s" % iv,
"--ad=%s" % ad,
"--key=%s" % key]
if direction == "decrypt":
cmd += ['--decrypt']
if direction == "decrypt":
invalue = ciphertext
else:
invalue = plaintext
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
out_raw = p.communicate(input=binascii.unhexlify(invalue))[0]
output = binascii.hexlify(out_raw).decode("UTF-8").lower()
expected = plaintext if direction == "decrypt" else ciphertext
if expected != output:
logging.error("For test %s got %s expected %s" % (data['testname'], output, expected))
def get_testdata(document, max_tests):
out = []
for algorithm in document:
if algorithm in SUPPORTED_ALGORITHMS:
testcase_number = 0
for testcase in document[algorithm]:
testcase_number += 1
for direction in ['encrypt', 'decrypt']:
testname = "{} no {:0>3} ({})".format(
algorithm.lower(), testcase_number, direction)
testname = re.sub("[^a-z0-9-]", "_", testname)
testname = re.sub("_+", "_", testname)
testname = testname.strip("_")
test = {'testname': testname}
for key in testcase:
value = testcase[key]
test[key] = value
test['Algorithm'] = algorithm
test['Direction'] = direction
out.append(test)
if max_tests > 0 and testcase_number > max_tests:
break
return out
def main(args=None):
if args is None:
args = sys.argv
parser = argparse.ArgumentParser(description="")
parser.add_argument('cli_binary', help='path to the botan cli binary')
parser.add_argument('--max-tests', type=int, default=50, metavar="M")
parser.add_argument('--threads', type=int, default=0, metavar="T")
parser.add_argument('--verbose', action='store_true', default=False)
parser.add_argument('--quiet', action='store_true', default=False)
args = parser.parse_args()
setup_logging(args)
cli_binary = args.cli_binary
max_tests = args.max_tests
threads = args.threads
if threads == 0:
threads = multiprocessing.cpu_count()
test_data_dir = os.path.join('src', 'tests', 'data')
mode_test_data = [os.path.join(test_data_dir, 'modes', 'cfb.vec'),
os.path.join(test_data_dir, 'aead', 'gcm.vec'),
os.path.join(test_data_dir, 'aead', 'ocb.vec'),
os.path.join(test_data_dir, 'modes', 'xts.vec'),
os.path.join(test_data_dir, 'aead', 'chacha20poly1305.vec')]
kats = []
for f in mode_test_data:
vecfile = VecDocument(f)
kats += get_testdata(vecfile.get_data(), max_tests)
start_time = time.time()
if threads > 1:
pool = ThreadPool(processes=threads)
results = []
for test in kats:
results.append(pool.apply_async(test_cipher_kat, (cli_binary, test)))
for result in results:
result.get()
else:
for test in kats:
test_cipher_kat(test)
end_time = time.time()
print("Ran %d tests with %d failures in %.02f seconds" % (
len(kats), TESTS_FAILED, end_time - start_time))
if TESTS_FAILED > 0:
return 1
return 0
if __name__ == '__main__':
sys.exit(main())