From 22ed23f071266cd68b8eff8ca2862e1bc2c043b2 Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Wed, 3 Aug 2016 00:01:27 +0200 Subject: [PATCH] TLS version (in)tolerance scanner Since it is impossible to make openssl command line tool send TLSv1.3 Client Hello message, add a python based tool to perform TLS version intolerance scan --- .gitignore | 8 + cipherscan | 20 ++- cscan.py | 259 +++++++++++++++++++++++++++++++++ cscan.sh | 26 ++++ cscan/__init__.py | 0 cscan/config.py | 134 +++++++++++++++++ cscan/constants.py | 165 +++++++++++++++++++++ cscan/extensions.py | 198 +++++++++++++++++++++++++ cscan/messages.py | 256 ++++++++++++++++++++++++++++++++ cscan/modifiers.py | 29 ++++ cscan/scanner.py | 157 ++++++++++++++++++++ cscan_tests/__init__.py | 0 cscan_tests/test_config.py | 50 +++++++ cscan_tests/test_extensions.py | 76 ++++++++++ top1m/parse_results.py | 26 ++++ 15 files changed, 1403 insertions(+), 1 deletion(-) create mode 100644 cscan.py create mode 100755 cscan.sh create mode 100644 cscan/__init__.py create mode 100644 cscan/config.py create mode 100644 cscan/constants.py create mode 100644 cscan/extensions.py create mode 100644 cscan/messages.py create mode 100644 cscan/modifiers.py create mode 100644 cscan/scanner.py create mode 100644 cscan_tests/__init__.py create mode 100644 cscan_tests/test_config.py create mode 100644 cscan_tests/test_extensions.py diff --git a/.gitignore b/.gitignore index 04ab27c..77184aa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,10 @@ mozilla/* top1m/results/* +tlslite +.tlslite-ng +*/__pycache__ +*.pyc +.coverage +.python-ecdsa +ecdsa +tlslite diff --git a/cipherscan b/cipherscan index 9040cc6..a008feb 100755 --- a/cipherscan +++ b/cipherscan @@ -965,6 +965,8 @@ display_results_in_terminal() { done | sort fi fi + + echo "$cscan_tests" } display_results_in_json() { @@ -1064,7 +1066,12 @@ display_results_in_json() { echo -n "}" ctr=$((ctr+1)) done - echo '}}' + echo -n '}' + if [[ -n $cscan_tests ]]; then + echo -n ',"intolerancies":' + echo -n "$cscan_tests" + fi + echo '}' } test_serverside_ordering() { @@ -1534,6 +1541,17 @@ test_tls_tolerance() { tls_tolerance['small-SSLv3']="True $current_protocol $current_cipher $current_trusted" fi fi + + # finally run the Python based test to perform more precise scan + if [[ "$OUTPUTFORMAT" == "json" ]]; then + cscan_tests="$($DIRNAMEPATH/cscan.sh -j $TARGET $sni_target)" + else + if [[ $VERBOSE != 0 ]]; then + cscan_tests="$($DIRNAMEPATH/cscan.sh -v --no-header $TARGET $sni_target)" + else + cscan_tests="$($DIRNAMEPATH/cscan.sh --no-header $TARGET $sni_target)" + fi + fi } test_kex_sigalgs() { diff --git a/cscan.py b/cscan.py new file mode 100644 index 0000000..8314c09 --- /dev/null +++ b/cscan.py @@ -0,0 +1,259 @@ +# Copyright 2016(c) Hubert Kario +# This work is released under the Mozilla Public License Version 2.0 +"""tlslite-ng based server configuration (and bug) scanner.""" + +from __future__ import print_function +from tlslite.messages import ClientHello, ServerHello, ServerHelloDone, Alert +from tlslite.constants import CipherSuite, \ + AlertLevel +import sys +import json +import getopt +import itertools + +from cscan.scanner import Scanner +from cscan.config import Firefox_42 +from cscan.modifiers import set_hello_version + + +def scan_with_config(host, port, conf, hostname, __sentry=None, __cache={}): + """Connect to server and return set of exchanged messages.""" + assert __sentry is None + key = (host, port, conf, hostname) + if key in __cache: + return __cache[key] + + scanner = Scanner(conf, host, port, hostname) + ret = scanner.scan() + __cache[key] = ret + if verbose and not json_out: + print(".", end='') + sys.stdout.flush() + return ret + + +def simple_inspector(result): + """ + Perform simple check to see if connection was successful. + + Returns True is connection was successful, server replied with + ServerHello and ServerHelloDone messages, and the cipher selected + was present in ciphers advertised by client, False otherwise + """ + if any(isinstance(x, ServerHelloDone) for x in result): + ch = next((x for x in result if isinstance(x, ClientHello)), None) + sh = next((x for x in result if isinstance(x, ServerHello)), None) + if ch and sh: + if sh.cipher_suite not in ch.cipher_suites: + # FAILURE cipher suite mismatch + return False + return True + # incomplete response or error + return False + + +def verbose_inspector(desc, result): + """Describe the connection result in human-readable form.""" + ret = "{0}:".format(desc) + if any(isinstance(x, ServerHelloDone) for x in result): + ch = next((x for x in result if isinstance(x, ClientHello)), None) + sh = next((x for x in result if isinstance(x, ServerHello)), None) + if sh and ch: + if sh.cipher_suite not in ch.cipher_suites: + ret += " FAILURE cipher suite mismatch" + return ret + name = CipherSuite.ietfNames[sh.cipher_suite] \ + if sh.cipher_suite in CipherSuite.ietfNames \ + else hex(sh.cipher_suite) + ret += " ok: {0}, {1}".format(sh.server_version, + name) + return ret + ret += " FAILURE " + errors = [] + for msg in result: + if isinstance(msg, ClientHello): + continue + # check if returned message supports custom formatting + if msg.__class__.__format__ is not object.__format__: + errors += ["{:vxm}".format(msg)] + else: + errors += [repr(msg)] + # skip printing close errors after fatal alerts, they are expected + if isinstance(msg, Alert) and msg.level == AlertLevel.fatal: + break + ret += "\n".join(errors) + return ret + +configs = {} + + +def load_configs(): + """Load known client configurations for later use in scanning.""" + base_configs = [Firefox_42] + for conf in base_configs: + for version in ((3, 1), (3, 2), (3, 3), (3, 4), (3, 5), (3, 254)): + if conf().version != version: + # just changed version + gen = set_hello_version(conf(), version) + if gen.record_version > version: + gen.record_version = version + configs[gen.name] = gen + + # Firefox 42 configs + gen = Firefox_42() + configs[gen.name] = gen + + +def scan_TLS_intolerancies(host, port, hostname): + """Look for intolerancies (version, extensions, ...) in a TLS server.""" + results = {} + + def result_iterator(predicate): + """ + Selecting iterator over cached results. + + Looks for matching result from already performed scans + """ + return (not simple_inspector(results[name]) for name in results + if predicate(configs[name])) + + def result_cache(name, conf): + """Perform scan if config is not in results, caches result.""" + return results[name] if name in results \ + else results.setdefault(name, scan_with_config(host, port, conf, + hostname)) + + def conf_iterator(predicate): + """ + Caching, selecting iterator over configs. + + Returns an iterator that will go over configs that match the provided + predicate (a function that returns true or false depending if given + config is ok for test at hand) while saving the results to the + cache/verbose `results` log/dictionary + """ + scan_iter = (not simple_inspector(result_cache(name, conf)) + for name, conf in configs.items() + if predicate(conf)) + return itertools.chain(result_iterator(predicate), scan_iter) + + host_up = not all(conf_iterator(lambda conf: True)) + + intolerancies = {} + if not host_up: + if json_out: + print(json.dumps(intolerancies)) + else: + print("Host does not seem to support SSL or TLS protocol") + return + + intolerancies["SSL 3.254"] = all(conf_iterator(lambda conf: + conf.version == (3, 254))) + intolerancies["TLS 1.4"] = all(conf_iterator(lambda conf: + conf.version == (3, 5))) + intolerancies["TLS 1.3"] = all(conf_iterator(lambda conf: + conf.version == (3, 4))) + intolerancies["TLS 1.2"] = all(conf_iterator(lambda conf: + conf.version == (3, 3))) + intolerancies["TLS 1.1"] = all(conf_iterator(lambda conf: + conf.version == (3, 2))) + intolerancies["TLS 1.0"] = all(conf_iterator(lambda conf: + conf.version == (3, 1))) + + if json_out: + print(json.dumps(intolerancies)) + else: + if not no_header: + if verbose: + print() + print("Host {0}:{1} scan complete".format(host, port)) + if hostname: + print("SNI hostname used: {0}".format(hostname)) + if verbose: + print() + print("Individual probe results:") + for desc, ret in sorted(results.items()): + print(verbose_inspector(desc, ret)) + + print() + print("Intolerance to:") + for intolerance, value in sorted(intolerancies.items()): + print(" {0:20}: {1}".format(intolerance, + "PRESENT" if value else "absent")) + + +def single_probe(name): + """Run a single probe against a server, print result.""" + print(verbose_inspector(name, scan_with_config(host, port, + configs[name], hostname))) + + +def usage(): + """Print usage information.""" + print("./cscan.py [ARGUMENTS] host[:port] [SNI-HOST-NAME]") + print() + print("-l, --list List probe names") + print("-p name, --probe Run just a single probe") + print("-j, --json Output in JSON format") + print("-v, --verbose Use verbose output") + +if __name__ == "__main__": + try: + opts, args = getopt.getopt(sys.argv[1:], + "jvhlp:", + ["json", "verbose", "help", "list", + "probe=", "no-header"]) + except getopt.GetoptError as err: + print(err) + usage() + sys.exit(2) + + json_out = False + verbose = False + list_probes = False + run_probe = None + no_header = False + + for opt, arg in opts: + if opt in ('-j', '--json'): + json_out = True + elif opt in ('-v', '--verbose'): + verbose = True + elif opt in ('-h', '--help'): + usage() + sys.exit(0) + elif opt in ('-l', '--list'): + list_probes = True + elif opt in ('-p', '--probe'): + run_probe = arg + elif opt in ('--no-header', ): + no_header = True + else: + raise AssertionError("Unknown option {0}".format(opt)) + + if len(args) > 2: + print("Too many arguments") + usage() + sys.exit(2) + + load_configs() + + if list_probes: + for desc, ret in sorted(configs.items()): + print("{0}: {1}".format(desc, ret.__doc__)) + sys.exit(0) + + hostname = None + if len(args) == 2: + hostname = args[1] + hostaddr = args[0].split(":") + if len(hostaddr) > 1: + host, port = hostaddr + else: + host = hostaddr[0] + port = 443 + + if run_probe: + single_probe(run_probe) + else: + scan_TLS_intolerancies(host, port, hostname) diff --git a/cscan.sh b/cscan.sh new file mode 100755 index 0000000..e0a3b01 --- /dev/null +++ b/cscan.sh @@ -0,0 +1,26 @@ +#!/bin/bash +pushd "$(dirname ${BASH_SOURCE[0]})" > /dev/null +if [ ! -d ./tlslite ]; then + git clone --depth=1 https://github.com/tomato42/tlslite-ng.git .tlslite-ng + ln -s .tlslite-ng/tlslite tlslite +fi +if [ ! -d ./ecdsa ]; then + git clone --depth=1 https://github.com/warner/python-ecdsa.git .python-ecdsa + ln -s .python-ecdsa/ecdsa ecdsa +fi + +# update the code if it is running in interactive terminal +#if [[ -t 1 ]]; then +if [[ $UPDATE ]]; then + pushd .tlslite-ng >/dev/null + git pull origin master --quiet + popd >/dev/null + pushd .python-ecdsa >/dev/null + git pull origin master --quiet + popd >/dev/null +fi + +PYTHONPATH=. python cscan.py "$@" +ret=$? +popd > /dev/null +exit $ret diff --git a/cscan/__init__.py b/cscan/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cscan/config.py b/cscan/config.py new file mode 100644 index 0000000..5e14eae --- /dev/null +++ b/cscan/config.py @@ -0,0 +1,134 @@ +# Copyright (c) 2016 Hubert Kario +# Released under Mozilla Public License Version 2.0 + +"""Typical Client Hello messages sent by different clients.""" + +import random +from tlslite.messages import ClientHello +from tlslite.constants import \ + ECPointFormat, HashAlgorithm, SignatureAlgorithm +from tlslite.extensions import SNIExtension, SupportedGroupsExtension, \ + TLSExtension, SignatureAlgorithmsExtension, NPNExtension, \ + ECPointFormatsExtension +from tlslite.utils.cryptomath import numberToByteArray +from .constants import CipherSuite, ExtensionType, GroupName + + +class HelloConfig(object): + """Base object for all Client Hello configurations.""" + + def __init__(self): + """Initialize object with default settings.""" + self._name = None + self.modifications = [] + self.callbacks = [] + self.version = (3, 3) + self.record_version = (3, 0) + self.ciphers = [] + self.extensions = None + self.random = None + self.session_id = bytearray(0) + self.compression_methods = [0] + self.ssl2 = False + + @property + def name(self): + """Return the name of config with all the modifications applied.""" + if self.modifications: + return "{0} ({1})".format(self._name, + ", ".join(self.modifications)) + else: + return self._name + + @name.setter + def name(self, value): + """Set the base name of the configuration.""" + self._name = value + + def __call__(self, hostname): + """Generate a client hello object, use hostname in SNI extension.""" + # SNI is special in that we don't want to send it if it is empty + if self.extensions: + sni = next((x for x in self.extensions + if isinstance(x, SNIExtension)), + None) + if sni: + if hostname is not None: + if sni.serverNames is None: + sni.serverNames = [] + sni.hostNames = [hostname] + else: + # but if we were not provided with a host name, we want + # to remove empty extension + if sni.serverNames is None: + self.extensions = [x for x in self.extensions + if not isinstance(x, SNIExtension)] + + if self.random: + rand = self.random + else: + # we're not doing any crypto with it, just need "something" + # TODO: place unix time at the beginning + rand = numberToByteArray(random.getrandbits(256), 32) + + ch = ClientHello(self.ssl2).create(self.version, rand, self.session_id, + self.ciphers, + extensions=self.extensions) + ch.compression_methods = self.compression_methods + for cb in self.callbacks: + ch = cb(ch) + return ch + + +class Firefox_42(HelloConfig): + """Create Client Hello like Firefox 42.""" + + def __init__(self): + """Set the configuration to Firefox 42.""" + super(Firefox_42, self).__init__() + self._name = "Firefox 42" + self.version = (3, 3) + self.record_version = (3, 1) + self.ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, + CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, + CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, + CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA, + CipherSuite.TLS_DHE_RSA_WITH_AES_256_CBC_SHA, + CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA, + CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA, + CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA] + ext = self.extensions = [] + ext.append(SNIExtension()) + ext.append(TLSExtension(extType=ExtensionType.renegotiation_info) + .create(bytearray(1))) + ext.append(SupportedGroupsExtension().create([GroupName.secp256r1, + GroupName.secp384r1, + GroupName.secp521r1])) + ext.append(ECPointFormatsExtension() + .create([ECPointFormat.uncompressed])) + ext.append(TLSExtension(extType=ExtensionType.session_ticket)) + ext.append(NPNExtension()) + ext.append(TLSExtension(extType=ExtensionType.alpn) + .create(bytearray(b'\x00\x15' + + b'\x02' + b'h2' + + b'\x08' + b'spdy/3.1' + + b'\x08' + b'http/1.1'))) + ext.append(TLSExtension(extType=ExtensionType.status_request) + .create(bytearray(b'\x01' + + b'\x00\x00' + + b'\x00\x00'))) + sig_algs = [] + for alg in ['sha256', 'sha384', 'sha512', 'sha1']: + sig_algs.append((getattr(HashAlgorithm, alg), + SignatureAlgorithm.rsa)) + for alg in ['sha256', 'sha384', 'sha512', 'sha1']: + sig_algs.append((getattr(HashAlgorithm, alg), + SignatureAlgorithm.ecdsa)) + for alg in ['sha256', 'sha1']: + sig_algs.append((getattr(HashAlgorithm, alg), + SignatureAlgorithm.dsa)) + ext.append(SignatureAlgorithmsExtension() + .create(sig_algs)) diff --git a/cscan/constants.py b/cscan/constants.py new file mode 100644 index 0000000..267fcae --- /dev/null +++ b/cscan/constants.py @@ -0,0 +1,165 @@ +# Copyright 2016(c) Hubert Kario +# This work is released under the Mozilla Public License Version 2.0 +"""Extend the tlslite-ng constants with values it does not support.""" + +import tlslite.constants + +from tlslite.constants import CipherSuite + +CipherSuite.ecdheEcdsaSuites = [] + +# RFC 5289 +CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384 = 0xC02C +CipherSuite.ietfNames[0xC02C] = 'TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384' +CipherSuite.ecdheEcdsaSuites.append(CipherSuite. + TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384) + +CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 = 0xC02B +CipherSuite.ietfNames[0xC02B] = 'TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256' +CipherSuite.ecdheEcdsaSuites.append(CipherSuite. + TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256) + +CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384 = 0xC024 +CipherSuite.ietfNames[0xC024] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384' +CipherSuite.ecdheEcdsaSuites.append(CipherSuite. + TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384) + +CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256 = 0xC023 +CipherSuite.ietfNames[0xC023] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256' +CipherSuite.ecdheEcdsaSuites.append(CipherSuite. + TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256) + +# RFC 4492 +CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA = 0xC00A +CipherSuite.ietfNames[0xC00A] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA' +CipherSuite.ecdheEcdsaSuites.append(CipherSuite. + TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA) + +CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA = 0xC009 +CipherSuite.ietfNames[0xC009] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA' +CipherSuite.ecdheEcdsaSuites.append(CipherSuite. + TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA) + +# RFC 7251 +CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CCM = 0xC0Ad +CipherSuite.ietfNames[0xC0AD] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CCM' +CipherSuite.ecdheEcdsaSuites.append(CipherSuite. + TLS_ECDHE_ECDSA_WITH_AES_256_CCM) + +CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM = 0xC0AC +CipherSuite.ietfNames[0xC0AC] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CCM' +CipherSuite.ecdheEcdsaSuites.append(CipherSuite. + TLS_ECDHE_ECDSA_WITH_AES_128_CCM) + +CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8 = 0xC0AF +CipherSuite.ietfNames[0xC0AF] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8' +CipherSuite.ecdheEcdsaSuites.append(CipherSuite. + TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8) + +CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8 = 0xC0AE +CipherSuite.ietfNames[0xC0AE] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8' +CipherSuite.ecdheEcdsaSuites.append(CipherSuite. + TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8) + +CipherSuite.ecdhAllSuites.extend(CipherSuite.ecdheEcdsaSuites) +CipherSuite.certAllSuites.extend(CipherSuite.ecdheEcdsaSuites) + +# obsolete stuff +CipherSuite.TLS_RSA_WITH_DES_CBC_SHA = 0x0009 +CipherSuite.ietfNames[0x0009] = 'TLS_RSA_WITH_DES_CBC_SHA' + +CipherSuite.TLS_RSA_EXPORT1024_WITH_RC4_56_SHA = 0x0064 +CipherSuite.ietfNames[0x0064] = 'TLS_RSA_EXPORT1024_WITH_RC4_56_SHA' +CipherSuite.TLS_RSA_EXPORT1024_WITH_DES_CBC_SHA = 0x0062 +CipherSuite.ietfNames[0x0062] = 'TLS_RSA_EXPORT1024_WITH_DES_CBC_SHA' +CipherSuite.TLS_RSA_EXPORT_WITH_RC4_40_MD5 = 0x0003 +CipherSuite.ietfNames[0x0003] = 'TLS_RSA_EXPORT_WITH_RC4_40_MD5' +CipherSuite.TLS_RSA_EXPORT_WITH_RC2_CBC_40_MD5 = 0x0006 +CipherSuite.ietfNames[0x0006] = 'TLS_RSA_EXPORT_WITH_RC2_CBC_40_MD5' + +# DSS +CipherSuite.dheDssSuites = [] + +CipherSuite.TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA = 0x0013 +CipherSuite.ietfNames[0x0013] = 'TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA' +CipherSuite.dheDssSuites.append(CipherSuite. + TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA) + +CipherSuite.TLS_DHE_DSS_WITH_DES_CBC_SHA = 0x0012 +CipherSuite.ietfNames[0x0012] = 'TLS_DHE_DSS_WITH_DES_CBC_SHA' +CipherSuite.dheDssSuites.append(CipherSuite. + TLS_DHE_DSS_WITH_DES_CBC_SHA) + +CipherSuite.TLS_DHE_DSS_EXPORT1024_WITH_DES_CBC_SHA = 0x0063 +CipherSuite.ietfNames[0x0063] = 'TLS_DHE_DSS_EXPORT1024_WITH_DES_CBC_SHA' +CipherSuite.dheDssSuites.append(CipherSuite. + TLS_DHE_DSS_EXPORT1024_WITH_DES_CBC_SHA) + +CipherSuite.TLS_DHE_DSS_WITH_AES_128_CBC_SHA = 0x0032 +CipherSuite.ietfNames[0x0032] = 'TLS_DHE_DSS_WITH_AES_128_CBC_SHA' +CipherSuite.dheDssSuites.append(CipherSuite. + TLS_DHE_DSS_WITH_AES_128_CBC_SHA) + +CipherSuite.TLS_DHE_DSS_WITH_AES_256_CBC_SHA = 0x0038 +CipherSuite.ietfNames[0x0038] = 'TLS_DHE_DSS_WITH_AES_256_CBC_SHA' +CipherSuite.dheDssSuites.append(CipherSuite. + TLS_DHE_DSS_WITH_AES_256_CBC_SHA) + +CipherSuite.TLS_DHE_DSS_WITH_AES_128_CBC_SHA256 = 0x0040 +CipherSuite.ietfNames[0x0040] = 'TLS_DHE_DSS_WITH_AES_128_CBC_SHA256' +CipherSuite.dheDssSuites.append(CipherSuite. + TLS_DHE_DSS_WITH_AES_128_CBC_SHA256) + +CipherSuite.TLS_DHE_DSS_WITH_AES_256_CBC_SHA256 = 0x006a +CipherSuite.ietfNames[0x006a] = 'TLS_DHE_DSS_WITH_AES_256_CBC_SHA256' +CipherSuite.dheDssSuites.append(CipherSuite. + TLS_DHE_DSS_WITH_AES_256_CBC_SHA256) + + +class ExtensionType(tlslite.constants.ExtensionType): + """Definitions of TLS extension IDs.""" + + status_request = 5 + alpn = 16 + session_ticket = 35 + + heartbeat = 15 # RFC 6520 + status_request_v2 = 17 # RFC 6961 + padding = 21 # RFC 7685 + max_fragment_legth = 1 # RFC 6066 + + # From: Eric Rescorla + # Date: Mon, 7 Dec 2015 05:36:22 -0800 + # [TLS] TLS 1.3 ServerConfiguration + early_data = 40 + pre_shared_key = 41 + key_share = 42 + cookie = 43 + + +class GroupName(tlslite.constants.GroupName): + """ECDH and FFDH key exchange group names.""" + + allEC = list(tlslite.constants.GroupName.allEC) + allFF = list(tlslite.constants.GroupName.allFF) + + ecdh_x25519 = 29 + allEC.append(ecdh_x25519) + + ecdh_x448 = 30 + allEC.append(ecdh_x448) + + eddsa_ed25519 = 31 + allEC.append(eddsa_ed25519) + + eddsa_ed448 = 32 + allEC.append(eddsa_ed448) + + all = allEC + allFF + + +class HandshakeType(tlslite.constants.HandshakeType): + """Type of messages in Handshake protocol.""" + + certificate_status = 22 + session_ticket = 4 diff --git a/cscan/extensions.py b/cscan/extensions.py new file mode 100644 index 0000000..8495579 --- /dev/null +++ b/cscan/extensions.py @@ -0,0 +1,198 @@ +# Copyright 2016(c) Hubert Kario +# This work is released under the Mozilla Public License Version 2.0 + +"""Extra TLS extensions.""" + +import tlslite.extensions +from tlslite.utils.codec import Writer +from tlslite.utils.compat import b2a_hex +from .constants import ExtensionType, GroupName +import .messages + +# make TLSExtensions hashable (__eq__ is already defined in base class) +tlslite.extensions.TLSExtension.__hash__ = lambda self: hash(self.extType) ^ \ + hash(bytes(self.extData)) + + +class RenegotiationExtension(tlslite.extensions.TLSExtension): + """Secure Renegotiation extension RFC 5746.""" + + def __init__(self): + """Initialize secure renegotiation extension.""" + super(RenegotiationExtension, self).__init__( + extType=ExtensionType.renegotiation_info) + self.renegotiated_connection = None + + def create(self, data): + """Set the value of the Finished message.""" + self.renegotiated_connection = data + + @property + def extData(self): + """Serialise the extension.""" + if self.renegotiated_connection is None: + return bytearray(0) + + writer = Writer() + writer.addVarSeq(self.renegotiated_connection, 1, 1) + return writer.bytes + + def parse(self, parser): + """Deserialise the extension from binary data.""" + if parser.getRemainingLength() == 0: + self.renegotiated_connection = None + return + + self.renegotiated_connection = parser.getVarBytes(1) + return self + + def __repr__(self): + """Human readable representation of extension.""" + return "RenegotiationExtension(renegotiated_connection={0!r})"\ + .format(self.renegotiated_connection) + + def __format__(self, formatstr): + """Formatted representation of extension.""" + data = messages.format_bytearray(self.renegotiated_connection, + formatstr) + return "RenegotiationExtension(renegotiated_connection={0})"\ + .format(data) + +tlslite.extensions.TLSExtension._universalExtensions[ + ExtensionType.renegotiation_info] = RenegotiationExtension + + +class SessionTicketExtension(tlslite.extensions.TLSExtension): + """Session Ticket extension (a.k.a. OCSP staple).""" + + def __init__(self): + """Create Session Ticket extension.""" + super(SessionTicketExtension, self).__init__( + extType=ExtensionType.session_ticket) + self.data = bytearray(0) + + def parse(self, parser): + """Deserialise the extension from binary data.""" + self.data = parser.bytes + return self + + def __format__(self, formatstr): + """Print extension data in human-readable form.""" + data = messages.format_bytearray(self.data, formatstr) + return "SessionTicketExtension(data={0})".format(data) + +tlslite.extensions.TLSExtension._universalExtensions[ + ExtensionType.session_ticket] = SessionTicketExtension + + +class ServerStatusRequestExtension(tlslite.extensions.TLSExtension): + """Server Status Request extension.""" + + def __init__(self): + """Create server status request extension.""" + super(ServerStatusRequestExtension, self).__init__( + extType=ExtensionType.status_request) + + def parse(self, parser): + """Deserialise the extension from binary data.""" + if parser.getRemainingLength() != 0: + raise SyntaxError() # FIXME + return self + + def __repr__(self): + """Human readable representation of the object.""" + return "ServerStatusRequestExtension()" + +tlslite.extensions.TLSExtension._serverExtensions[ + ExtensionType.status_request] = ServerStatusRequestExtension + + +class KeyShareExtension(tlslite.extensions.TLSExtension): + """TLS1.3 extension for handling key negotiation.""" + + def __init__(self): + """Create key share extension object.""" + super(KeyShareExtension, self).__init__( + extType=ExtensionType.key_share) + self.client_shares = None + + def create(self, shares): + """ + Set the list of key shares to send. + + @type shares: list of tuples + @param shares: a list of tuples where the first element is a NamedGroup + ID while the second element in a tuple is an opaque bytearray encoding + of the key share. + """ + self.client_shares = shares + return self + + @property + def extData(self): + """Serialise the extension.""" + if self.client_shares is None: + return bytearray(0) + + writer = Writer() + for group_id, share in self.client_shares: + writer.add(group_id, 2) + if group_id in GroupName.allFF: + share_length_length = 2 + else: + share_length_length = 1 + writer.addVarSeq(share, 1, share_length_length) + ext_writer = Writer() + ext_writer.add(len(writer.bytes), 2) + ext_writer.bytes += writer.bytes + return ext_writer.bytes + + def parse(self, parser): + """Deserialise the extension.""" + if parser.getRemainingLength() == 0: + self.client_shares = None + return + + self.client_shares = [] + + parser.startLengthCheck(2) + while not parser.atLengthCheck(): + group_id = parser.get(2) + if group_id in GroupName.allFF: + share_length_length = 2 + else: + share_length_length = 1 + share = parser.getVarBytes(share_length_length) + self.client_shares.append((group_id, share)) + + return self + + def __repr__(self): + """Human readble representation of extension.""" + return "KeyShareExtension({0!r})".format(self.client_shares) + + def __format__(self, formatstr): + """Formattable representation of extension.""" + if self.client_shares is None: + return "KeyShareExtension(None)" + + verbose = "" + hexlify = False + if 'v' in formatstr: + verbose = "GroupName." + if 'h' in formatstr: + hexlify = True + + shares = [] + for group_id, share in self.client_shares: + if hexlify: + share = b2a_hex(share) + else: + share = repr(share) + shares += ["({0}{1}, {2})".format(verbose, + GroupName.toStr(group_id), + share)] + return "KeyShareExtension([" + ",".join(shares) + "])" + +tlslite.extensions.TLSExtension._universalExtensions[ + ExtensionType.key_share] = KeyShareExtension diff --git a/cscan/messages.py b/cscan/messages.py new file mode 100644 index 0000000..7c84bc2 --- /dev/null +++ b/cscan/messages.py @@ -0,0 +1,256 @@ +# Copyright (c) 2016 Hubert Kario +# Released under Mozilla Public License 2.0 + +"""Extensions and modification of the tlslite-ng messages classes.""" + +import tlslite.messages as messages +from tlslite.utils.compat import b2a_hex +from tlslite.constants import ContentType, CertificateType, ECCurveType, \ + HashAlgorithm, SignatureAlgorithm +from tlslite.x509certchain import X509CertChain +from tlslite.utils.cryptomath import secureHash +from .constants import HandshakeType, CipherSuite, GroupName + +# gotta go fast +# comparing client hello's using ClientHello.write() is painfully slow +# monkey patch in faster compare methods + + +def __CH_eq_fun(self, other): + """ + Check if the other is equal to the object. + + always returns false if other is not a ClientHello object + """ + if not isinstance(other, messages.ClientHello): + return False + + return self.ssl2 == other.ssl2 and \ + self.client_version == other.client_version and \ + self.random == other.random and \ + self.session_id == other.session_id and \ + self.cipher_suites == other.cipher_suites and \ + self.compression_methods == other.compression_methods and \ + self.extensions == other.extensions + +messages.ClientHello.__eq__ = __CH_eq_fun + + +def __CH_ne_fun(self, other): + """ + Check if the other is not equal to the object. + + always returns true if other is not a ClientHello object + """ + return not self.__eq__(other) + +messages.ClientHello.__ne__ = __CH_ne_fun + + +def format_bytearray(byte_array, formatstr): + """Format method for bytearrays.""" + if 'x' in formatstr: + return b2a_hex(byte_array) + else: + return repr(byte_array) + + +def format_array(array, formatstr): + """Return string representation of array while formatting elements.""" + if array is None: + return "None" + else: + str_array = [] + for elem in array: + if elem.__class__.__format__ is not object.__format__: + str_array += ["{0:{1}}".format(elem, formatstr)] + else: + str_array += [repr(elem)] + return "[" + ", ".join(str_array) + "]" + + +class ServerHello(messages.ServerHello): + """Class with enhanced human-readable serialisation.""" + + def __format__(self, formatstr): + """Return human readable representation of the object.""" + extensions = format_array(self.extensions, formatstr) + random = format_bytearray(self.random, formatstr) + session_id = format_bytearray(self.session_id, formatstr) + cipher_suite = CipherSuite.ietfNames.get(self.cipher_suite, + self.cipher_suite) + + if 'v' in formatstr: + cipher_suite = "CipherSuite.{0}".format(cipher_suite) + + # TODO cipher_suites (including verbose) + # TODO compression_method (including verbose) + return ("ServerHello(server_version=({0[0]}, {0[1]}), random={1}, " + "session_id={2!r}, cipher_suite={3}, compression_method={4}, " + "_tack_ext={5}, extensions={6})").format( + self.server_version, random, session_id, + cipher_suite, self.compression_method, self._tack_ext, + extensions) + + +class Certificate(messages.Certificate): + """Class with more robust certificate parsing and serialisation.""" + + def parse(self, parser): + """Deserialise the object from binary data.""" + index = parser.index + try: + return super(Certificate, self).parse(parser) + except (AssertionError, SyntaxError): + pass + parser.index = index + parser.startLengthCheck(3) + if self.certificateType == CertificateType.x509: + chainLength = parser.get(3) + index = 0 + certificate_list = [] + while index != chainLength: + certBytes = parser.getVarBytes(3) + certificate_list.append(certBytes) + index += len(certBytes)+3 + if certificate_list: + self.certChain = certificate_list + else: + raise AssertionError() + + parser.stopLengthCheck() + return self + + def __format__(self, formatstr): + """Advanced formatting for messages.""" + hexify = False + verbose = False + digest = False + if 'h' in formatstr: + hexify = True + if 'v' in formatstr: + verbose = True + if 'm' in formatstr: + digest = True + + if self.certChain is None: + cert_list = None + else: + if isinstance(self.certChain, X509CertChain): + cert_list = [cert.bytes for cert in self.certChain.x509List] + else: + cert_list = self.certChain + + if digest: + cert_list = "[" + ", ".join(b2a_hex(secureHash(cert, 'sha256')) + for cert in cert_list) + "]" + else: + cert_list = [repr(cert) for cert in cert_list] + + return "Certificate({0})".format(cert_list) + + +class NewSessionTicket(messages.HandshakeMsg): + """Class for handling the Session Tickets from RFC 5077.""" + + def __init__(self): + """Initilize new sesion ticket message object.""" + super(NewSessionTicket, self).__init__(HandshakeType.session_ticket) + self.ticket_lifetime_hintt = 0 + self.ticket = None + + def parse(self, parser): + """Parse the object from on-the-wire data.""" + self.ticket_lifetime_hint = parser.get(4) + self.ticket = parser.getVarBytes(2) + return self + + def __format__(self, formatstr): + """Return human-readable representation of the object.""" + ticket = format_bytearray(self.ticket, formatstr) + return "NewSessionTicket(ticket_lifetime_hint={0}, ticket={1})"\ + .format(self.ticket_lifetime_hintt, ticket) + + +class CertificateStatus(messages.HandshakeMsg): + """Class for handling the CertificateStatus OCSP staples from RFC 4366.""" + + def __init__(self): + """Create a certificate status message handling object.""" + super(CertificateStatus, self).__init__( + HandshakeType.certificate_status) + self.status_type = 0 + self.response = None + + def parse(self, parser): + """Deserialise certificate status message from binary data.""" + parser.startLengthCheck(3) + self.status_type = parser.get(1) + if self.status_type == 1: # FIXME, create registry + self.response = parser.getVarBytes(3) + else: + raise SyntaxError() # FIXME, use sane-er type + parser.stopLengthCheck() + return self + + def __format__(self, formatstr): + """Return human-readable representation of certificate status.""" + response = format_bytearray(self.response, formatstr) + return "CertificateStatus(status_type={0}, response={1})"\ + .format(self.status_type, response) + + +class Message(messages.Message): + """Message class with more robust formatting capability.""" + + def __format__(self, formatstr): + """Advanced formatting for messages.""" + hexify = False + verbose = "" + if 'h' in formatstr: + hexify = True + if 'v' in formatstr: + verbose = "ContentType." + + if hexify: + data = b2a_hex(self.data) + else: + data = repr(self.data) + + return "Message(contentType={0}{1}, data={2})"\ + .format(verbose, ContentType.toStr(self.contentType), data) + + +class ServerKeyExchange(messages.ServerKeyExchange): + """ServerKeyExchange class with more robust formatting capability.""" + + def __format__(self, formatstr): + """Return human-readable representation of the object.""" + if 'v' in formatstr: + verbose = "CipherSuite." + else: + verbose = "" + + ret = "ServerKeyExchange(cipherSuite={0}{1}, version={2}"\ + .format(verbose, CipherSuite.ietfNames[self.cipherSuite], + self.version) + if self.srp_N: + ret += ", srp_N={0}, srp_g={1}, srp_s={2}, srp_B={3}"\ + .format(self.srp_N, self.srp_g, self.srp_s, self.srp_B) + if self.dh_p: + ret += ", dh_p={0}, dh_g={1}, dh_Ys={2}"\ + .format(self.dh_p, self.dh_g, self.dh_Ys) + if self.ecdh_Ys: + ecdh_Ys = format_bytearray(self.ecdh_Ys, formatstr) + ret += ", curve_type={0}, named_curve={1}, ecdh_Ys={2}"\ + .format(ECCurveType.toStr(self.curve_type), + GroupName.toStr(self.named_curve), ecdh_Ys) + if self.signAlg: + ret += ", hashAlg={0}, signAlg={1}"\ + .format(HashAlgorithm.toStr(self.hashAlg), + SignatureAlgorithm.toStr(self.signAlg)) + if self.signature: + ret += ", signature={0}"\ + .format(format_bytearray(self.signature, formatstr)) + + return ret + ")" diff --git a/cscan/modifiers.py b/cscan/modifiers.py new file mode 100644 index 0000000..c69332f --- /dev/null +++ b/cscan/modifiers.py @@ -0,0 +1,29 @@ +# Copyright (c) 2016 Hubert Kario +# Released under Mozilla Public License 2.0 + +"""Methods for modifying the scan configurations on the fly.""" + +from __future__ import print_function + +proto_versions = {(3, 0): "SSLv3", + (3, 1): "TLSv1.0", + (3, 2): "TLSv1.1", + (3, 3): "TLSv1.2", + (3, 4): "TLSv1.3", + (3, 5): "TLSv1.4", + (3, 6): "TLSv1.5"} + + +def version_to_str(version): + """Convert a version tuple to human-readable string.""" + version_name = proto_versions.get(version, None) + if version_name is None: + version_name = "{0[0]}.{0[1]}".format(version) + return version_name + + +def set_hello_version(generator, version): + """Set client hello version.""" + generator.version = version + generator.modifications += [version_to_str(version)] + return generator diff --git a/cscan/scanner.py b/cscan/scanner.py new file mode 100644 index 0000000..e77b942 --- /dev/null +++ b/cscan/scanner.py @@ -0,0 +1,157 @@ +# Copyright (c) 2016 Hubert Kario +# Released under the Mozilla Public License 2.0 + +"""Classes used for scanning servers and getting their responses.""" + +import socket + +from .constants import CipherSuite, HandshakeType +from .messages import Certificate, ServerHello, Message, NewSessionTicket, \ + CertificateStatus, ServerKeyExchange +from tlslite.constants import CertificateType, ContentType +from tlslite.messages import \ + CertificateRequest, NextProtocol, ServerHelloDone, Alert +from tlslite.defragmenter import Defragmenter +from tlslite.messagesocket import MessageSocket +from tlslite.errors import TLSAbruptCloseError, TLSIllegalParameterException + + +class HandshakeParser(object): + """Inteligent parser for handshake messages.""" + + def __init__(self, version=(3, 1), + cipher_suite=CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA, + certificate_type=CertificateType.x509): + """Initialize parser object.""" + self.version = version + self.cipher_suite = cipher_suite + self.certificate_type = certificate_type + + def parse(self, parser): + """Parse a handshake message.""" + hs_type = parser.get(1) + if hs_type == HandshakeType.server_hello: + msg = ServerHello().parse(parser) + self.version = msg.server_version + self.cipher_suite = msg.cipher_suite + self.certificate_type = msg.certificate_type + return msg + elif hs_type == HandshakeType.certificate: + msg = Certificate(self.certificate_type) + elif hs_type == HandshakeType.server_key_exchange: + msg = ServerKeyExchange(self.cipher_suite, self.version) + elif hs_type == HandshakeType.certificate_request: + msg = CertificateRequest(self.version) + elif hs_type == HandshakeType.next_protocol: + msg = NextProtocol().parse(parser) + elif hs_type == HandshakeType.server_hello_done: + msg = ServerHelloDone() + elif hs_type == HandshakeType.session_ticket: + msg = NewSessionTicket() + elif hs_type == HandshakeType.certificate_status: + msg = CertificateStatus() + else: + raise ValueError("Unknown handshake type: {0}".format(hs_type)) + + # don't abort when we can't parse a message, save it as unparsed + try: + msg.parse(parser) + except SyntaxError: + msg = Message(ContentType.handshake, parser.bytes) + return msg + + +class Scanner(object): + """Helper class for scanning a host and returning serialised responses.""" + + def __init__(self, hello_gen, host, port=443, hostname=None): + """Initialize scanner.""" + self.host = host + self.hello_gen = hello_gen + self.port = port + self.hostname = hostname + + def scan(self): + """Perform a scan on server.""" + defragger = Defragmenter() + defragger.addStaticSize(ContentType.change_cipher_spec, 1) + defragger.addStaticSize(ContentType.alert, 2) + defragger.addDynamicSize(ContentType.handshake, 1, 3) + + try: + raw_sock = socket.create_connection((self.host, self.port), 5) + except socket.error as e: + return [e] + + sock = MessageSocket(raw_sock, defragger) + + if self.hostname is not None: + client_hello = self.hello_gen(bytearray(self.hostname, + 'utf-8')) + else: + client_hello = self.hello_gen(None) + + # record layer version - TLSv1.x + # use the version from configuration, if present, or default to the + # RFC recommended (3, 1) for TLS and (3, 0) for SSLv3 + if hasattr(client_hello, 'record_version'): + sock.version = client_hello.record_version + elif hasattr(self.hello_gen, 'record_version'): + sock.version = self.hello_gen.record_version + elif client_hello.client_version > (3, 1): # TLS1.0 + sock.version = (3, 1) + else: + sock.version = client_hello.client_version + + # we don't want to send invalid messages (SSLv2 hello in SSL record + # layer), so set the record layer version to SSLv2 if the hello is + # of SSLv2 format + if client_hello.ssl2: + sock.version = (0, 2) + + # save the record version used in the end for later analysis + client_hello.record_version = sock.version + + messages = [client_hello] + + handshake_parser = HandshakeParser() + + try: + sock.sendMessageBlocking(client_hello) + except socket.error as e: + messages.append(e) + return messages + except TLSAbruptCloseError as e: + sock.sock.close() + messages.append(e) + return messages + + # get all the server messages that affect connection, abort as soon + # as they've been read + try: + while True: + header, parser = sock.recvMessageBlocking() + + if header.type == ContentType.alert: + alert = Alert() + alert.parse(parser) + alert.record_version = header.version + messages += [alert] + elif header.type == ContentType.handshake: + msg = handshake_parser.parse(parser) + msg.record_version = header.version + messages += [msg] + if isinstance(msg, ServerHelloDone): + return messages + else: + raise TypeError("Unknown content type: {0}" + .format(header.type)) + except (TLSAbruptCloseError, TLSIllegalParameterException, + ValueError, TypeError, socket.error, SyntaxError) as e: + messages += [e] + return messages + finally: + try: + sock.sock.close() + except (socket.error, OSError): + pass diff --git a/cscan_tests/__init__.py b/cscan_tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cscan_tests/test_config.py b/cscan_tests/test_config.py new file mode 100644 index 0000000..13093a2 --- /dev/null +++ b/cscan_tests/test_config.py @@ -0,0 +1,50 @@ +# Copyright (c) 2015 Hubert Kario +# Released under Mozilla Public License Version 2.0 + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from tlslite.messages import ClientHello +from tlslite.extensions import SNIExtension, SupportedGroupsExtension, \ + ECPointFormatsExtension, NPNExtension, SignatureAlgorithmsExtension +from tlslite.utils.codec import Parser +from cscan.config import Firefox_42 +from cscan.extensions import RenegotiationExtension +from cscan.constants import ExtensionType + +class TestFirefox(unittest.TestCase): + def test_firefox_42(self): + gen = Firefox_42() + ch = gen(bytearray(b'example.com')) + + self.assertIsNotNone(ch) + self.assertIsInstance(ch, ClientHello) + self.assertEqual(len(ch.write()), 176) + self.assertEqual(ch.client_version, (3, 3)) + self.assertEqual(gen.record_version, (3, 1)) + self.assertEqual(len(ch.cipher_suites), 11) + self.assertIsInstance(ch.extensions[0], SNIExtension) + self.assertEqual(ch.extensions[1].extType, + ExtensionType.renegotiation_info) + self.assertIsInstance(ch.extensions[2], + SupportedGroupsExtension) + self.assertIsInstance(ch.extensions[3], + ECPointFormatsExtension) + self.assertEqual(ch.extensions[4].extType, + ExtensionType.session_ticket) + # bug in tlslite-ng, removes NPN extensions from provided extensions + #self.assertIsInstance(ch.extensions[5], + # NPNExtension) + self.assertEqual(ch.extensions[5].extType, + ExtensionType.alpn) + self.assertEqual(ch.extensions[6].extType, + ExtensionType.status_request) + self.assertIsInstance(ch.extensions[7], + SignatureAlgorithmsExtension) + self.assertEqual(ch.compression_methods, [0]) + + +if __name__ == "__main__": + unittest.main() diff --git a/cscan_tests/test_extensions.py b/cscan_tests/test_extensions.py new file mode 100644 index 0000000..4a3c045 --- /dev/null +++ b/cscan_tests/test_extensions.py @@ -0,0 +1,76 @@ +# Copyright (c) 2015 Hubert Kario +# Released under Mozilla Public License Version 2.0 + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from tlslite.utils.codec import Parser +from cscan.extensions import KeyShareExtension +from cscan.constants import GroupName + +class TestKeyShareExtension(unittest.TestCase): + def test___init__(self): + ext = KeyShareExtension() + + self.assertIsNotNone(ext) + + def test_create(self): + ext = KeyShareExtension() + + ext.create([(1, bytearray(b'\x12')), + (2, bytearray(b'\x33'))]) + + self.assertEqual(ext.client_shares, [(1, bytearray(b'\x12')), + (2, bytearray(b'\x33'))]) + + def test_write(self): + ext = KeyShareExtension() + + ext.create([(GroupName.secp256r1, bytearray(b'\xff\xfa')), + (GroupName.ffdhe2048, bytearray(b'\xaf\xaa'))]) + + data = ext.write() + + self.assertEqual(data, bytearray( + b'\x00\x2a\x00\x0d' + b'\x00\x0b' + b'\x00\x17\x02\xff\xfa' + b'\x01\x00\x00\x02\xaf\xaa')) + + def test_write_with_no_data(self): + ext = KeyShareExtension() + + data = ext.write() + + self.assertEqual(data, bytearray(b'\x00\x2a\x00\x00')) + + def test_parse(self): + parser = Parser(bytearray( + #b'\x00\x2a\x00\x0d' + b'\x00\x0b' + b'\x00\x17\x02\xff\xfa' + b'\x01\x00\x00\x02\xaf\xaa')) + + ext = KeyShareExtension() + ext.parse(parser) + + self.assertEqual(ext.client_shares, + [(GroupName.secp256r1, bytearray(b'\xff\xfa')), + (GroupName.ffdhe2048, bytearray(b'\xaf\xaa'))]) + + def test_parse_with_no_data(self): + parser = Parser(bytearray()) + + ext = KeyShareExtension() + ext.parse(parser) + + self.assertIsNone(ext.client_shares) + + def test___repr__(self): + ext = KeyShareExtension() + ext.create([(1, bytearray(b'\xff'))]) + + self.assertEqual("KeyShareExtension([(1, bytearray(b\'\\xff\'))])", + repr(ext)) diff --git a/top1m/parse_results.py b/top1m/parse_results.py index f163a88..2322941 100644 --- a/top1m/parse_results.py +++ b/top1m/parse_results.py @@ -113,6 +113,7 @@ eccordering = defaultdict(int) ecccurve = defaultdict(int) ocspstaple = defaultdict(int) fallbacks = defaultdict(int) +intolerancies = defaultdict(int) # array with indexes of fallback names for the matrix report fallback_ids = defaultdict(int) i=0 @@ -175,6 +176,7 @@ for r,d,flist in os.walk(path): tempeccordering = "unknown" tempecccurve = {} tempfallbacks = {} + tempintolerancies = {} """ supported ciphers by the server under scan """ tempcipherstats = {} temppfssigalgordering = {} @@ -349,6 +351,21 @@ for r,d,flist in os.walk(path): except KeyError: pass + if 'intolerancies' in results: + intoler = results['intolerancies'] + for name, val in intoler.items(): + if val is True: + tempintolerancies[name] = 1 + size_intol = [x.replace('size ', '') + for x in tempintolerancies.keys() + if x.startswith('size ')] + if size_intol: + size_intol.sort(reverse=True) + tempintolerancies['size {0}' + .format(" ".join(size_intol))] = 1 + else: + tempintolerancies['x:missing information'] = 1 + """ get some extra data about server """ if 'renegotiation' in results: temprenegotiation[results['renegotiation']] = 1 @@ -573,6 +590,9 @@ for r,d,flist in os.walk(path): for s in tempfallbacks: fallbacks[s] += 1 + for s in tempintolerancies: + intolerancies[s] += 1 + for s in tempsigstats: sigalg[s] += 1 @@ -902,3 +922,9 @@ print("------------------------") fallback_ids_sorted=sorted(fallback_ids.items(), key=operator.itemgetter(1)) for touple in fallback_ids_sorted: print(str(touple[1]+1).rjust(3) + " " + str(touple[0])) + +print("\nClient Hello intolerance Count Percent") +print("----------------------------------------+---------+-------") +for stat in sorted(intolerancies): + percent = round(intolerancies[stat] / total * 100, 4) + sys.stdout.write(stat.ljust(40) + " " + str(intolerancies[stat]).ljust(10) + str(percent).ljust(4) + "\n")