TLS version (in)tolerance scanner

Since it is impossible to make openssl command line tool send
TLSv1.3 Client Hello message, add a python based tool to perform
TLS version intolerance scan
This commit is contained in:
Hubert Kario 2016-10-05 01:00:11 +02:00
parent e5b747d29b
commit 45bb7d0c28
14 changed files with 1404 additions and 1 deletions

8
.gitignore vendored
View File

@ -1,2 +1,10 @@
mozilla/*
top1m/results/*
tlslite
.tlslite-ng
*/__pycache__
*.pyc
.coverage
.python-ecdsa
ecdsa
tlslite

View File

@ -980,6 +980,8 @@ display_results_in_terminal() {
done | sort
fi
fi
echo "$cscan_tests"
}
display_results_in_json() {
@ -1080,7 +1082,12 @@ display_results_in_json() {
echo -n "}"
ctr=$((ctr+1))
done
echo '}}'
echo -n '}'
if [[ -n $cscan_tests ]]; then
echo -n ',"intolerancies":'
echo -n "$cscan_tests"
fi
echo '}'
}
test_serverside_ordering() {
@ -1550,6 +1557,24 @@ test_tls_tolerance() {
tls_tolerance['small-SSLv3']="True $current_protocol $current_cipher $current_trusted"
fi
fi
# finally run the Python based test to perform more precise scan
options=()
options+=("$DIRNAMEPATH/cscan.sh")
if [[ "$OUTPUTFORMAT" == "json" ]]; then
options+=(-j)
else
if [[ $VERBOSE != 0 ]]; then
options+=("-v" "--no-header")
else
options+=("--no-header")
fi
fi
options+=("$TARGET")
if [[ -n $sni_target ]]; then
options+=("$sni_target")
fi
cscan_tests="$(${options[*]})"
}
test_kex_sigalgs() {
@ -2101,6 +2126,11 @@ if [[ $VERBOSE != 0 ]] ; then
fi
SCLIENTARGS="${PARAMS[*]}"
# we need the SNI for cscan, so save it if it was provided through
# OpenSSL options
if [[ $SCLIENTARGS =~ servername[\ ]*([^\ ]*)[\ ]* ]]; then
sni_target="${BASH_REMATCH[1]}"
fi
# only append the SNI:
# if the target is a hostname by validating the tld
# if -servername was not supplied by the user
@ -2109,6 +2139,7 @@ if [[ $SNI == "True" && ! $SCLIENTARGS =~ servername ]]; then
SCLIENTARGS="$SCLIENTARGS -servername $sni_target"
else
echo "Warning: target is not a FQDN. SNI was disabled. Use a FQDN or '-servername <fqdn>'" 1>&2
sni_target=''
fi
fi
debug "sclientargs: $SCLIENTARGS"

265
cscan.py Normal file
View File

@ -0,0 +1,265 @@
# Copyright 2016(c) Hubert Kario
# This work is released under the Mozilla Public License Version 2.0
"""tlslite-ng based server configuration (and bug) scanner."""
from __future__ import print_function
from tlslite.messages import ClientHello, ServerHello, ServerHelloDone, Alert
from tlslite.constants import CipherSuite, \
AlertLevel
import sys
import json
import getopt
import itertools
from cscan.scanner import Scanner
from cscan.config import Firefox_42
from cscan.modifiers import set_hello_version
def scan_with_config(host, port, conf, hostname, __sentry=None, __cache={}):
"""Connect to server and return set of exchanged messages."""
assert __sentry is None
key = (host, port, conf, hostname)
if key in __cache:
if verbose and not json_out:
print(":", end='')
return __cache[key]
scanner = Scanner(conf, host, port, hostname)
ret = scanner.scan()
__cache[key] = ret
if verbose and not json_out:
print(".", end='')
sys.stdout.flush()
return ret
def simple_inspector(result):
"""
Perform simple check to see if connection was successful.
Returns True is connection was successful, server replied with
ServerHello and ServerHelloDone messages, and the cipher selected
was present in ciphers advertised by client, False otherwise
"""
if any(isinstance(x, ServerHelloDone) for x in result):
ch = next((x for x in result if isinstance(x, ClientHello)), None)
sh = next((x for x in result if isinstance(x, ServerHello)), None)
if ch and sh:
if sh.cipher_suite not in ch.cipher_suites:
# FAILURE cipher suite mismatch
return False
return True
# incomplete response or error
return False
def verbose_inspector(desc, result):
"""Describe the connection result in human-readable form."""
ret = "{0}:".format(desc)
if any(isinstance(x, ServerHelloDone) for x in result):
ch = next((x for x in result if isinstance(x, ClientHello)), None)
sh = next((x for x in result if isinstance(x, ServerHello)), None)
if sh and ch:
if sh.cipher_suite not in ch.cipher_suites:
ret += " FAILURE cipher suite mismatch"
return ret
name = CipherSuite.ietfNames[sh.cipher_suite] \
if sh.cipher_suite in CipherSuite.ietfNames \
else hex(sh.cipher_suite)
ret += " ok: {0}, {1}".format(sh.server_version,
name)
return ret
ret += " FAILURE "
errors = []
for msg in result:
if isinstance(msg, ClientHello):
continue
# check if returned message supports custom formatting
if msg.__class__.__format__ is not object.__format__:
errors += ["{:vxm}".format(msg)]
else:
errors += [repr(msg)]
# skip printing close errors after fatal alerts, they are expected
if isinstance(msg, Alert) and msg.level == AlertLevel.fatal:
break
ret += "\n".join(errors)
return ret
configs = {}
def load_configs():
"""Load known client configurations for later use in scanning."""
base_configs = [Firefox_42]
for conf in base_configs:
for version in ((3, 1), (3, 2), (3, 3), (3, 4), (3, 5), (3, 254)):
if conf().version != version:
# just changed version
gen = set_hello_version(conf(), version)
if gen.record_version > version:
gen.record_version = version
configs[gen.name] = gen
# Firefox 42 configs
gen = Firefox_42()
configs[gen.name] = gen
def scan_TLS_intolerancies(host, port, hostname):
"""Look for intolerancies (version, extensions, ...) in a TLS server."""
results = {}
def result_iterator(predicate):
"""
Selecting iterator over cached results.
Looks for matching result from already performed scans
"""
return (not simple_inspector(results[name]) for name in results
if predicate(configs[name]))
def result_cache(name, conf):
"""Perform scan if config is not in results, caches result."""
return results[name] if name in results \
else results.setdefault(name, scan_with_config(host, port, conf,
hostname))
def conf_iterator(predicate):
"""
Caching, selecting iterator over configs.
Returns an iterator that will go over configs that match the provided
predicate (a function that returns true or false depending if given
config is ok for test at hand) while saving the results to the
cache/verbose `results` log/dictionary
The iterator returns False for every connection that succeeded
(meaning the server is NOT intolerant to config and True to mean
that server IS intolerant to config.
"""
scan_iter = (not simple_inspector(result_cache(name, conf))
for name, conf in configs.items()
if predicate(conf))
return itertools.chain(result_iterator(predicate), scan_iter)
host_up = not all(conf_iterator(lambda conf: True))
intolerancies = {}
if not host_up:
if json_out:
print(json.dumps(intolerancies))
else:
print("Host does not seem to support SSL or TLS protocol")
return
intolerancies["SSL 3.254"] = all(conf_iterator(lambda conf:
conf.version == (3, 254)))
intolerancies["TLS 1.4"] = all(conf_iterator(lambda conf:
conf.version == (3, 5)))
intolerancies["TLS 1.3"] = all(conf_iterator(lambda conf:
conf.version == (3, 4)))
intolerancies["TLS 1.2"] = all(conf_iterator(lambda conf:
conf.version == (3, 3)))
intolerancies["TLS 1.1"] = all(conf_iterator(lambda conf:
conf.version == (3, 2)))
intolerancies["TLS 1.0"] = all(conf_iterator(lambda conf:
conf.version == (3, 1)))
if json_out:
print(json.dumps(intolerancies))
else:
if not no_header:
if verbose:
print()
print("Host {0}:{1} scan complete".format(host, port))
if hostname:
print("SNI hostname used: {0}".format(hostname))
if verbose:
print()
print("Individual probe results:")
for desc, ret in sorted(results.items()):
print(verbose_inspector(desc, ret))
print()
print("Intolerance to:")
for intolerance, value in sorted(intolerancies.items()):
print(" {0:20}: {1}".format(intolerance,
"PRESENT" if value else "absent"))
def single_probe(name):
"""Run a single probe against a server, print result."""
print(verbose_inspector(name, scan_with_config(host, port,
configs[name], hostname)))
def usage():
"""Print usage information."""
print("./cscan.py [ARGUMENTS] host[:port] [SNI-HOST-NAME]")
print()
print("-l, --list List probe names")
print("-p name, --probe Run just a single probe")
print("-j, --json Output in JSON format")
print("-v, --verbose Use verbose output")
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:],
"jvhlp:",
["json", "verbose", "help", "list",
"probe=", "no-header"])
except getopt.GetoptError as err:
print(err)
usage()
sys.exit(2)
json_out = False
verbose = False
list_probes = False
run_probe = None
no_header = False
for opt, arg in opts:
if opt in ('-j', '--json'):
json_out = True
elif opt in ('-v', '--verbose'):
verbose = True
elif opt in ('-h', '--help'):
usage()
sys.exit(0)
elif opt in ('-l', '--list'):
list_probes = True
elif opt in ('-p', '--probe'):
run_probe = arg
elif opt in ('--no-header', ):
no_header = True
else:
raise AssertionError("Unknown option {0}".format(opt))
if len(args) > 2:
print("Too many arguments")
usage()
sys.exit(2)
load_configs()
if list_probes:
for desc, ret in sorted(configs.items()):
print("{0}: {1}".format(desc, ret.__doc__))
sys.exit(0)
hostname = None
if len(args) == 2:
hostname = args[1]
hostaddr = args[0].split(":")
if len(hostaddr) > 1:
host, port = hostaddr
else:
host = hostaddr[0]
port = 443
if run_probe:
single_probe(run_probe)
else:
scan_TLS_intolerancies(host, port, hostname)

26
cscan.sh Executable file
View File

@ -0,0 +1,26 @@
#!/bin/bash
pushd "$(dirname ${BASH_SOURCE[0]})" > /dev/null
if [ ! -d ./tlslite ]; then
git clone --depth=1 https://github.com/tomato42/tlslite-ng.git .tlslite-ng
ln -s .tlslite-ng/tlslite tlslite
fi
if [ ! -d ./ecdsa ]; then
git clone --depth=1 https://github.com/warner/python-ecdsa.git .python-ecdsa
ln -s .python-ecdsa/ecdsa ecdsa
fi
# update the code if it is running in interactive terminal
#if [[ -t 1 ]]; then
if [[ $UPDATE ]]; then
pushd .tlslite-ng >/dev/null
git pull origin master --quiet
popd >/dev/null
pushd .python-ecdsa >/dev/null
git pull origin master --quiet
popd >/dev/null
fi
PYTHONPATH=. python cscan.py "$@"
ret=$?
popd > /dev/null
exit $ret

0
cscan/__init__.py Normal file
View File

134
cscan/config.py Normal file
View File

@ -0,0 +1,134 @@
# Copyright (c) 2016 Hubert Kario <hkario@redhat.com>
# Released under Mozilla Public License Version 2.0
"""Typical Client Hello messages sent by different clients."""
import random
from tlslite.messages import ClientHello
from tlslite.constants import \
ECPointFormat, HashAlgorithm, SignatureAlgorithm
from tlslite.extensions import SNIExtension, SupportedGroupsExtension, \
TLSExtension, SignatureAlgorithmsExtension, NPNExtension, \
ECPointFormatsExtension
from tlslite.utils.cryptomath import numberToByteArray
from .constants import CipherSuite, ExtensionType, GroupName
class HelloConfig(object):
"""Base object for all Client Hello configurations."""
def __init__(self):
"""Initialize object with default settings."""
self._name = None
self.modifications = []
self.callbacks = []
self.version = (3, 3)
self.record_version = (3, 0)
self.ciphers = []
self.extensions = None
self.random = None
self.session_id = bytearray(0)
self.compression_methods = [0]
self.ssl2 = False
@property
def name(self):
"""Return the name of config with all the modifications applied."""
if self.modifications:
return "{0} ({1})".format(self._name,
", ".join(self.modifications))
else:
return self._name
@name.setter
def name(self, value):
"""Set the base name of the configuration."""
self._name = value
def __call__(self, hostname):
"""Generate a client hello object, use hostname in SNI extension."""
# SNI is special in that we don't want to send it if it is empty
if self.extensions:
sni = next((x for x in self.extensions
if isinstance(x, SNIExtension)),
None)
if sni:
if hostname is not None:
if sni.serverNames is None:
sni.serverNames = []
sni.hostNames = [hostname]
else:
# but if we were not provided with a host name, we want
# to remove empty extension
if sni.serverNames is None:
self.extensions = [x for x in self.extensions
if not isinstance(x, SNIExtension)]
if self.random:
rand = self.random
else:
# we're not doing any crypto with it, just need "something"
# TODO: place unix time at the beginning
rand = numberToByteArray(random.getrandbits(256), 32)
ch = ClientHello(self.ssl2).create(self.version, rand, self.session_id,
self.ciphers,
extensions=self.extensions)
ch.compression_methods = self.compression_methods
for cb in self.callbacks:
ch = cb(ch)
return ch
class Firefox_42(HelloConfig):
"""Create Client Hello like Firefox 42."""
def __init__(self):
"""Set the configuration to Firefox 42."""
super(Firefox_42, self).__init__()
self._name = "Firefox 42"
self.version = (3, 3)
self.record_version = (3, 1)
self.ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA]
ext = self.extensions = []
ext.append(SNIExtension())
ext.append(TLSExtension(extType=ExtensionType.renegotiation_info)
.create(bytearray(1)))
ext.append(SupportedGroupsExtension().create([GroupName.secp256r1,
GroupName.secp384r1,
GroupName.secp521r1]))
ext.append(ECPointFormatsExtension()
.create([ECPointFormat.uncompressed]))
ext.append(TLSExtension(extType=ExtensionType.session_ticket))
ext.append(NPNExtension())
ext.append(TLSExtension(extType=ExtensionType.alpn)
.create(bytearray(b'\x00\x15' +
b'\x02' + b'h2' +
b'\x08' + b'spdy/3.1' +
b'\x08' + b'http/1.1')))
ext.append(TLSExtension(extType=ExtensionType.status_request)
.create(bytearray(b'\x01' +
b'\x00\x00' +
b'\x00\x00')))
sig_algs = []
for alg in ['sha256', 'sha384', 'sha512', 'sha1']:
sig_algs.append((getattr(HashAlgorithm, alg),
SignatureAlgorithm.rsa))
for alg in ['sha256', 'sha384', 'sha512', 'sha1']:
sig_algs.append((getattr(HashAlgorithm, alg),
SignatureAlgorithm.ecdsa))
for alg in ['sha256', 'sha1']:
sig_algs.append((getattr(HashAlgorithm, alg),
SignatureAlgorithm.dsa))
ext.append(SignatureAlgorithmsExtension()
.create(sig_algs))

165
cscan/constants.py Normal file
View File

@ -0,0 +1,165 @@
# Copyright 2016(c) Hubert Kario
# This work is released under the Mozilla Public License Version 2.0
"""Extend the tlslite-ng constants with values it does not support."""
import tlslite.constants
from tlslite.constants import CipherSuite
CipherSuite.ecdheEcdsaSuites = []
# RFC 5289
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384 = 0xC02C
CipherSuite.ietfNames[0xC02C] = 'TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384'
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384)
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 = 0xC02B
CipherSuite.ietfNames[0xC02B] = 'TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256'
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256)
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384 = 0xC024
CipherSuite.ietfNames[0xC024] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384'
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384)
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256 = 0xC023
CipherSuite.ietfNames[0xC023] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256'
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256)
# RFC 4492
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA = 0xC00A
CipherSuite.ietfNames[0xC00A] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA'
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA)
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA = 0xC009
CipherSuite.ietfNames[0xC009] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA'
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA)
# RFC 7251
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CCM = 0xC0Ad
CipherSuite.ietfNames[0xC0AD] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CCM'
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
TLS_ECDHE_ECDSA_WITH_AES_256_CCM)
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM = 0xC0AC
CipherSuite.ietfNames[0xC0AC] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CCM'
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
TLS_ECDHE_ECDSA_WITH_AES_128_CCM)
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8 = 0xC0AF
CipherSuite.ietfNames[0xC0AF] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8'
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8)
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8 = 0xC0AE
CipherSuite.ietfNames[0xC0AE] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8'
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8)
CipherSuite.ecdhAllSuites.extend(CipherSuite.ecdheEcdsaSuites)
CipherSuite.certAllSuites.extend(CipherSuite.ecdheEcdsaSuites)
# obsolete stuff
CipherSuite.TLS_RSA_WITH_DES_CBC_SHA = 0x0009
CipherSuite.ietfNames[0x0009] = 'TLS_RSA_WITH_DES_CBC_SHA'
CipherSuite.TLS_RSA_EXPORT1024_WITH_RC4_56_SHA = 0x0064
CipherSuite.ietfNames[0x0064] = 'TLS_RSA_EXPORT1024_WITH_RC4_56_SHA'
CipherSuite.TLS_RSA_EXPORT1024_WITH_DES_CBC_SHA = 0x0062
CipherSuite.ietfNames[0x0062] = 'TLS_RSA_EXPORT1024_WITH_DES_CBC_SHA'
CipherSuite.TLS_RSA_EXPORT_WITH_RC4_40_MD5 = 0x0003
CipherSuite.ietfNames[0x0003] = 'TLS_RSA_EXPORT_WITH_RC4_40_MD5'
CipherSuite.TLS_RSA_EXPORT_WITH_RC2_CBC_40_MD5 = 0x0006
CipherSuite.ietfNames[0x0006] = 'TLS_RSA_EXPORT_WITH_RC2_CBC_40_MD5'
# DSS
CipherSuite.dheDssSuites = []
CipherSuite.TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA = 0x0013
CipherSuite.ietfNames[0x0013] = 'TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA'
CipherSuite.dheDssSuites.append(CipherSuite.
TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA)
CipherSuite.TLS_DHE_DSS_WITH_DES_CBC_SHA = 0x0012
CipherSuite.ietfNames[0x0012] = 'TLS_DHE_DSS_WITH_DES_CBC_SHA'
CipherSuite.dheDssSuites.append(CipherSuite.
TLS_DHE_DSS_WITH_DES_CBC_SHA)
CipherSuite.TLS_DHE_DSS_EXPORT1024_WITH_DES_CBC_SHA = 0x0063
CipherSuite.ietfNames[0x0063] = 'TLS_DHE_DSS_EXPORT1024_WITH_DES_CBC_SHA'
CipherSuite.dheDssSuites.append(CipherSuite.
TLS_DHE_DSS_EXPORT1024_WITH_DES_CBC_SHA)
CipherSuite.TLS_DHE_DSS_WITH_AES_128_CBC_SHA = 0x0032
CipherSuite.ietfNames[0x0032] = 'TLS_DHE_DSS_WITH_AES_128_CBC_SHA'
CipherSuite.dheDssSuites.append(CipherSuite.
TLS_DHE_DSS_WITH_AES_128_CBC_SHA)
CipherSuite.TLS_DHE_DSS_WITH_AES_256_CBC_SHA = 0x0038
CipherSuite.ietfNames[0x0038] = 'TLS_DHE_DSS_WITH_AES_256_CBC_SHA'
CipherSuite.dheDssSuites.append(CipherSuite.
TLS_DHE_DSS_WITH_AES_256_CBC_SHA)
CipherSuite.TLS_DHE_DSS_WITH_AES_128_CBC_SHA256 = 0x0040
CipherSuite.ietfNames[0x0040] = 'TLS_DHE_DSS_WITH_AES_128_CBC_SHA256'
CipherSuite.dheDssSuites.append(CipherSuite.
TLS_DHE_DSS_WITH_AES_128_CBC_SHA256)
CipherSuite.TLS_DHE_DSS_WITH_AES_256_CBC_SHA256 = 0x006a
CipherSuite.ietfNames[0x006a] = 'TLS_DHE_DSS_WITH_AES_256_CBC_SHA256'
CipherSuite.dheDssSuites.append(CipherSuite.
TLS_DHE_DSS_WITH_AES_256_CBC_SHA256)
class ExtensionType(tlslite.constants.ExtensionType):
"""Definitions of TLS extension IDs."""
status_request = 5
alpn = 16
session_ticket = 35
heartbeat = 15 # RFC 6520
status_request_v2 = 17 # RFC 6961
padding = 21 # RFC 7685
max_fragment_legth = 1 # RFC 6066
# From: Eric Rescorla <ekr at rtfm.com>
# Date: Mon, 7 Dec 2015 05:36:22 -0800
# [TLS] TLS 1.3 ServerConfiguration
early_data = 40
pre_shared_key = 41
key_share = 42
cookie = 43
class GroupName(tlslite.constants.GroupName):
"""ECDH and FFDH key exchange group names."""
allEC = list(tlslite.constants.GroupName.allEC)
allFF = list(tlslite.constants.GroupName.allFF)
ecdh_x25519 = 29
allEC.append(ecdh_x25519)
ecdh_x448 = 30
allEC.append(ecdh_x448)
eddsa_ed25519 = 31
allEC.append(eddsa_ed25519)
eddsa_ed448 = 32
allEC.append(eddsa_ed448)
all = allEC + allFF
class HandshakeType(tlslite.constants.HandshakeType):
"""Type of messages in Handshake protocol."""
certificate_status = 22
session_ticket = 4

198
cscan/extensions.py Normal file
View File

@ -0,0 +1,198 @@
# Copyright 2016(c) Hubert Kario
# This work is released under the Mozilla Public License Version 2.0
"""Extra TLS extensions."""
import tlslite.extensions
from tlslite.utils.codec import Writer
from tlslite.utils.compat import b2a_hex
from .constants import ExtensionType, GroupName
import .messages
# make TLSExtensions hashable (__eq__ is already defined in base class)
tlslite.extensions.TLSExtension.__hash__ = lambda self: hash(self.extType) ^ \
hash(bytes(self.extData))
class RenegotiationExtension(tlslite.extensions.TLSExtension):
"""Secure Renegotiation extension RFC 5746."""
def __init__(self):
"""Initialize secure renegotiation extension."""
super(RenegotiationExtension, self).__init__(
extType=ExtensionType.renegotiation_info)
self.renegotiated_connection = None
def create(self, data):
"""Set the value of the Finished message."""
self.renegotiated_connection = data
@property
def extData(self):
"""Serialise the extension."""
if self.renegotiated_connection is None:
return bytearray(0)
writer = Writer()
writer.addVarSeq(self.renegotiated_connection, 1, 1)
return writer.bytes
def parse(self, parser):
"""Deserialise the extension from binary data."""
if parser.getRemainingLength() == 0:
self.renegotiated_connection = None
return
self.renegotiated_connection = parser.getVarBytes(1)
return self
def __repr__(self):
"""Human readable representation of extension."""
return "RenegotiationExtension(renegotiated_connection={0!r})"\
.format(self.renegotiated_connection)
def __format__(self, formatstr):
"""Formatted representation of extension."""
data = messages.format_bytearray(self.renegotiated_connection,
formatstr)
return "RenegotiationExtension(renegotiated_connection={0})"\
.format(data)
tlslite.extensions.TLSExtension._universalExtensions[
ExtensionType.renegotiation_info] = RenegotiationExtension
class SessionTicketExtension(tlslite.extensions.TLSExtension):
"""Session Ticket extension (a.k.a. OCSP staple)."""
def __init__(self):
"""Create Session Ticket extension."""
super(SessionTicketExtension, self).__init__(
extType=ExtensionType.session_ticket)
self.data = bytearray(0)
def parse(self, parser):
"""Deserialise the extension from binary data."""
self.data = parser.bytes
return self
def __format__(self, formatstr):
"""Print extension data in human-readable form."""
data = messages.format_bytearray(self.data, formatstr)
return "SessionTicketExtension(data={0})".format(data)
tlslite.extensions.TLSExtension._universalExtensions[
ExtensionType.session_ticket] = SessionTicketExtension
class ServerStatusRequestExtension(tlslite.extensions.TLSExtension):
"""Server Status Request extension."""
def __init__(self):
"""Create server status request extension."""
super(ServerStatusRequestExtension, self).__init__(
extType=ExtensionType.status_request)
def parse(self, parser):
"""Deserialise the extension from binary data."""
if parser.getRemainingLength() != 0:
raise SyntaxError() # FIXME
return self
def __repr__(self):
"""Human readable representation of the object."""
return "ServerStatusRequestExtension()"
tlslite.extensions.TLSExtension._serverExtensions[
ExtensionType.status_request] = ServerStatusRequestExtension
class KeyShareExtension(tlslite.extensions.TLSExtension):
"""TLS1.3 extension for handling key negotiation."""
def __init__(self):
"""Create key share extension object."""
super(KeyShareExtension, self).__init__(
extType=ExtensionType.key_share)
self.client_shares = None
def create(self, shares):
"""
Set the list of key shares to send.
@type shares: list of tuples
@param shares: a list of tuples where the first element is a NamedGroup
ID while the second element in a tuple is an opaque bytearray encoding
of the key share.
"""
self.client_shares = shares
return self
@property
def extData(self):
"""Serialise the extension."""
if self.client_shares is None:
return bytearray(0)
writer = Writer()
for group_id, share in self.client_shares:
writer.add(group_id, 2)
if group_id in GroupName.allFF:
share_length_length = 2
else:
share_length_length = 1
writer.addVarSeq(share, 1, share_length_length)
ext_writer = Writer()
ext_writer.add(len(writer.bytes), 2)
ext_writer.bytes += writer.bytes
return ext_writer.bytes
def parse(self, parser):
"""Deserialise the extension."""
if parser.getRemainingLength() == 0:
self.client_shares = None
return
self.client_shares = []
parser.startLengthCheck(2)
while not parser.atLengthCheck():
group_id = parser.get(2)
if group_id in GroupName.allFF:
share_length_length = 2
else:
share_length_length = 1
share = parser.getVarBytes(share_length_length)
self.client_shares.append((group_id, share))
return self
def __repr__(self):
"""Human readble representation of extension."""
return "KeyShareExtension({0!r})".format(self.client_shares)
def __format__(self, formatstr):
"""Formattable representation of extension."""
if self.client_shares is None:
return "KeyShareExtension(None)"
verbose = ""
hexlify = False
if 'v' in formatstr:
verbose = "GroupName."
if 'h' in formatstr:
hexlify = True
shares = []
for group_id, share in self.client_shares:
if hexlify:
share = b2a_hex(share)
else:
share = repr(share)
shares += ["({0}{1}, {2})".format(verbose,
GroupName.toStr(group_id),
share)]
return "KeyShareExtension([" + ",".join(shares) + "])"
tlslite.extensions.TLSExtension._universalExtensions[
ExtensionType.key_share] = KeyShareExtension

264
cscan/messages.py Normal file
View File

@ -0,0 +1,264 @@
# Copyright (c) 2016 Hubert Kario
# Released under Mozilla Public License 2.0
"""Extensions and modification of the tlslite-ng messages classes."""
import tlslite.messages as messages
from tlslite.utils.compat import b2a_hex
from tlslite.constants import ContentType, CertificateType, ECCurveType, \
HashAlgorithm, SignatureAlgorithm
from tlslite.x509certchain import X509CertChain
from tlslite.utils.cryptomath import secureHash
from .constants import HandshakeType, CipherSuite, GroupName
# gotta go fast
# comparing client hello's using ClientHello.write() is painfully slow
# monkey patch in faster compare methods
def __CH_eq_fun(self, other):
"""
Check if the other is equal to the object.
always returns false if other is not a ClientHello object
"""
if not isinstance(other, messages.ClientHello):
return False
return self.ssl2 == other.ssl2 and \
self.client_version == other.client_version and \
self.random == other.random and \
self.session_id == other.session_id and \
self.cipher_suites == other.cipher_suites and \
self.compression_methods == other.compression_methods and \
self.extensions == other.extensions
messages.ClientHello.__eq__ = __CH_eq_fun
def __CH_ne_fun(self, other):
"""
Check if the other is not equal to the object.
always returns true if other is not a ClientHello object
"""
return not self.__eq__(other)
messages.ClientHello.__ne__ = __CH_ne_fun
def format_bytearray(byte_array, formatstr):
"""Format method for bytearrays."""
if 'x' in formatstr:
return b2a_hex(byte_array)
else:
return repr(byte_array)
def format_array(array, formatstr):
"""Return string representation of array while formatting elements."""
if array is None:
return "None"
else:
str_array = []
for elem in array:
if elem.__class__.__format__ is not object.__format__:
str_array += ["{0:{1}}".format(elem, formatstr)]
else:
str_array += [repr(elem)]
return "[" + ", ".join(str_array) + "]"
class ServerHello(messages.ServerHello):
"""Class with enhanced human-readable serialisation."""
def __format__(self, formatstr):
"""Return human readable representation of the object."""
extensions = format_array(self.extensions, formatstr)
random = format_bytearray(self.random, formatstr)
session_id = format_bytearray(self.session_id, formatstr)
cipher_suite = CipherSuite.ietfNames.get(self.cipher_suite,
self.cipher_suite)
if 'v' in formatstr:
cipher_suite = "CipherSuite.{0}".format(cipher_suite)
# TODO cipher_suites (including verbose)
# TODO compression_method (including verbose)
return ("ServerHello(server_version=({0[0]}, {0[1]}), random={1}, "
"session_id={2!r}, cipher_suite={3}, compression_method={4}, "
"_tack_ext={5}, extensions={6})").format(
self.server_version, random, session_id,
cipher_suite, self.compression_method, self._tack_ext,
extensions)
class Certificate(messages.Certificate):
"""Class with more robust certificate parsing and serialisation."""
def parse(self, parser):
"""Deserialise the object from binary data."""
index = parser.index
try:
return super(Certificate, self).parse(parser)
except (AssertionError, SyntaxError):
pass
parser.index = index
parser.startLengthCheck(3)
if self.certificateType == CertificateType.x509:
chainLength = parser.get(3)
index = 0
certificate_list = []
while index != chainLength:
certBytes = parser.getVarBytes(3)
certificate_list.append(certBytes)
index += len(certBytes)+3
if certificate_list:
self.certChain = certificate_list
else:
raise AssertionError()
parser.stopLengthCheck()
return self
def __format__(self, formatstr):
"""Advanced formatting for messages."""
hexify = False
verbose = False
digest = False
if 'h' in formatstr:
hexify = True
if 'v' in formatstr:
verbose = True
if 'm' in formatstr:
digest = True
if self.certChain is None:
cert_list = None
else:
if isinstance(self.certChain, X509CertChain):
cert_list = [cert.bytes for cert in self.certChain.x509List]
else:
cert_list = self.certChain
if digest:
cert_list = "[" + ", ".join(b2a_hex(secureHash(cert, 'sha256'))
for cert in cert_list) + "]"
else:
cert_list = [repr(cert) for cert in cert_list]
return "Certificate({0})".format(cert_list)
class NewSessionTicket(messages.HandshakeMsg):
"""Class for handling the Session Tickets from RFC 5077."""
def __init__(self):
"""Initilize new sesion ticket message object."""
super(NewSessionTicket, self).__init__(HandshakeType.session_ticket)
self.ticket_lifetime_hintt = 0
self.ticket = None
def parse(self, parser):
"""Parse the object from on-the-wire data."""
self.ticket_lifetime_hint = parser.get(4)
self.ticket = parser.getVarBytes(2)
return self
def __format__(self, formatstr):
"""Return human-readable representation of the object."""
ticket = format_bytearray(self.ticket, formatstr)
return "NewSessionTicket(ticket_lifetime_hint={0}, ticket={1})"\
.format(self.ticket_lifetime_hintt, ticket)
class CertificateStatus(messages.HandshakeMsg):
"""Class for handling the CertificateStatus OCSP staples from RFC 4366."""
def __init__(self):
"""Create a certificate status message handling object."""
super(CertificateStatus, self).__init__(
HandshakeType.certificate_status)
self.status_type = 0
self.response = None
def parse(self, parser):
"""Deserialise certificate status message from binary data."""
parser.startLengthCheck(3)
self.status_type = parser.get(1)
if self.status_type == 1: # FIXME, create registry
self.response = parser.getVarBytes(3)
else:
raise SyntaxError() # FIXME, use sane-er type
parser.stopLengthCheck()
return self
def __format__(self, formatstr):
"""Return human-readable representation of certificate status."""
response = format_bytearray(self.response, formatstr)
return "CertificateStatus(status_type={0}, response={1})"\
.format(self.status_type, response)
class Message(messages.Message):
"""Message class with more robust formatting capability."""
def __format__(self, formatstr):
"""Advanced formatting for messages."""
hexify = False
verbose = ""
if 'h' in formatstr:
hexify = True
if 'v' in formatstr:
verbose = "ContentType."
if hexify:
data = b2a_hex(self.data)
else:
data = repr(self.data)
return "Message(contentType={0}{1}, data={2})"\
.format(verbose, ContentType.toStr(self.contentType), data)
class ServerKeyExchange(messages.ServerKeyExchange):
"""ServerKeyExchange class with more robust formatting capability."""
def parse(self, parser):
"""more robust parser for SKE"""
try:
super(ServerKeyExchange, self).parse(parser)
except AssertionError:
pass
return self
def __format__(self, formatstr):
"""Return human-readable representation of the object."""
if 'v' in formatstr:
verbose = "CipherSuite."
else:
verbose = ""
ret = "ServerKeyExchange(cipherSuite={0}{1}, version={2}"\
.format(verbose, CipherSuite.ietfNames[self.cipherSuite],
self.version)
if self.srp_N:
ret += ", srp_N={0}, srp_g={1}, srp_s={2}, srp_B={3}"\
.format(self.srp_N, self.srp_g, self.srp_s, self.srp_B)
if self.dh_p:
ret += ", dh_p={0}, dh_g={1}, dh_Ys={2}"\
.format(self.dh_p, self.dh_g, self.dh_Ys)
if self.ecdh_Ys:
ecdh_Ys = format_bytearray(self.ecdh_Ys, formatstr)
ret += ", curve_type={0}, named_curve={1}, ecdh_Ys={2}"\
.format(ECCurveType.toStr(self.curve_type),
GroupName.toStr(self.named_curve), ecdh_Ys)
if self.signAlg:
ret += ", hashAlg={0}, signAlg={1}"\
.format(HashAlgorithm.toStr(self.hashAlg),
SignatureAlgorithm.toStr(self.signAlg))
if self.signature:
ret += ", signature={0}"\
.format(format_bytearray(self.signature, formatstr))
return ret + ")"

29
cscan/modifiers.py Normal file
View File

@ -0,0 +1,29 @@
# Copyright (c) 2016 Hubert Kario
# Released under Mozilla Public License 2.0
"""Methods for modifying the scan configurations on the fly."""
from __future__ import print_function
proto_versions = {(3, 0): "SSLv3",
(3, 1): "TLSv1.0",
(3, 2): "TLSv1.1",
(3, 3): "TLSv1.2",
(3, 4): "TLSv1.3",
(3, 5): "TLSv1.4",
(3, 6): "TLSv1.5"}
def version_to_str(version):
"""Convert a version tuple to human-readable string."""
version_name = proto_versions.get(version, None)
if version_name is None:
version_name = "{0[0]}.{0[1]}".format(version)
return version_name
def set_hello_version(generator, version):
"""Set client hello version."""
generator.version = version
generator.modifications += [version_to_str(version)]
return generator

157
cscan/scanner.py Normal file
View File

@ -0,0 +1,157 @@
# Copyright (c) 2016 Hubert Kario
# Released under the Mozilla Public License 2.0
"""Classes used for scanning servers and getting their responses."""
import socket
from .constants import CipherSuite, HandshakeType
from .messages import Certificate, ServerHello, Message, NewSessionTicket, \
CertificateStatus, ServerKeyExchange
from tlslite.constants import CertificateType, ContentType
from tlslite.messages import \
CertificateRequest, NextProtocol, ServerHelloDone, Alert
from tlslite.defragmenter import Defragmenter
from tlslite.messagesocket import MessageSocket
from tlslite.errors import TLSAbruptCloseError, TLSIllegalParameterException
class HandshakeParser(object):
"""Inteligent parser for handshake messages."""
def __init__(self, version=(3, 1),
cipher_suite=CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
certificate_type=CertificateType.x509):
"""Initialize parser object."""
self.version = version
self.cipher_suite = cipher_suite
self.certificate_type = certificate_type
def parse(self, parser):
"""Parse a handshake message."""
hs_type = parser.get(1)
if hs_type == HandshakeType.server_hello:
msg = ServerHello().parse(parser)
self.version = msg.server_version
self.cipher_suite = msg.cipher_suite
self.certificate_type = msg.certificate_type
return msg
elif hs_type == HandshakeType.certificate:
msg = Certificate(self.certificate_type)
elif hs_type == HandshakeType.server_key_exchange:
msg = ServerKeyExchange(self.cipher_suite, self.version)
elif hs_type == HandshakeType.certificate_request:
msg = CertificateRequest(self.version)
elif hs_type == HandshakeType.next_protocol:
msg = NextProtocol().parse(parser)
elif hs_type == HandshakeType.server_hello_done:
msg = ServerHelloDone()
elif hs_type == HandshakeType.session_ticket:
msg = NewSessionTicket()
elif hs_type == HandshakeType.certificate_status:
msg = CertificateStatus()
else:
raise ValueError("Unknown handshake type: {0}".format(hs_type))
# don't abort when we can't parse a message, save it as unparsed
try:
msg.parse(parser)
except SyntaxError:
msg = Message(ContentType.handshake, parser.bytes)
return msg
class Scanner(object):
"""Helper class for scanning a host and returning serialised responses."""
def __init__(self, hello_gen, host, port=443, hostname=None):
"""Initialize scanner."""
self.host = host
self.hello_gen = hello_gen
self.port = port
self.hostname = hostname
def scan(self):
"""Perform a scan on server."""
defragger = Defragmenter()
defragger.addStaticSize(ContentType.change_cipher_spec, 1)
defragger.addStaticSize(ContentType.alert, 2)
defragger.addDynamicSize(ContentType.handshake, 1, 3)
try:
raw_sock = socket.create_connection((self.host, self.port), 5)
except socket.error as e:
return [e]
sock = MessageSocket(raw_sock, defragger)
if self.hostname is not None:
client_hello = self.hello_gen(bytearray(self.hostname,
'utf-8'))
else:
client_hello = self.hello_gen(None)
# record layer version - TLSv1.x
# use the version from configuration, if present, or default to the
# RFC recommended (3, 1) for TLS and (3, 0) for SSLv3
if hasattr(client_hello, 'record_version'):
sock.version = client_hello.record_version
elif hasattr(self.hello_gen, 'record_version'):
sock.version = self.hello_gen.record_version
elif client_hello.client_version > (3, 1): # TLS1.0
sock.version = (3, 1)
else:
sock.version = client_hello.client_version
# we don't want to send invalid messages (SSLv2 hello in SSL record
# layer), so set the record layer version to SSLv2 if the hello is
# of SSLv2 format
if client_hello.ssl2:
sock.version = (0, 2)
# save the record version used in the end for later analysis
client_hello.record_version = sock.version
messages = [client_hello]
handshake_parser = HandshakeParser()
try:
sock.sendMessageBlocking(client_hello)
except socket.error as e:
messages.append(e)
return messages
except TLSAbruptCloseError as e:
sock.sock.close()
messages.append(e)
return messages
# get all the server messages that affect connection, abort as soon
# as they've been read
try:
while True:
header, parser = sock.recvMessageBlocking()
if header.type == ContentType.alert:
alert = Alert()
alert.parse(parser)
alert.record_version = header.version
messages += [alert]
elif header.type == ContentType.handshake:
msg = handshake_parser.parse(parser)
msg.record_version = header.version
messages += [msg]
if isinstance(msg, ServerHelloDone):
return messages
else:
raise TypeError("Unknown content type: {0}"
.format(header.type))
except (TLSAbruptCloseError, TLSIllegalParameterException,
ValueError, TypeError, socket.error, SyntaxError) as e:
messages += [e]
return messages
finally:
try:
sock.sock.close()
except (socket.error, OSError):
pass

0
cscan_tests/__init__.py Normal file
View File

View File

@ -0,0 +1,50 @@
# Copyright (c) 2015 Hubert Kario
# Released under Mozilla Public License Version 2.0
try:
import unittest2 as unittest
except ImportError:
import unittest
from tlslite.messages import ClientHello
from tlslite.extensions import SNIExtension, SupportedGroupsExtension, \
ECPointFormatsExtension, NPNExtension, SignatureAlgorithmsExtension
from tlslite.utils.codec import Parser
from cscan.config import Firefox_42
from cscan.extensions import RenegotiationExtension
from cscan.constants import ExtensionType
class TestFirefox(unittest.TestCase):
def test_firefox_42(self):
gen = Firefox_42()
ch = gen(bytearray(b'example.com'))
self.assertIsNotNone(ch)
self.assertIsInstance(ch, ClientHello)
self.assertEqual(len(ch.write()), 176)
self.assertEqual(ch.client_version, (3, 3))
self.assertEqual(gen.record_version, (3, 1))
self.assertEqual(len(ch.cipher_suites), 11)
self.assertIsInstance(ch.extensions[0], SNIExtension)
self.assertEqual(ch.extensions[1].extType,
ExtensionType.renegotiation_info)
self.assertIsInstance(ch.extensions[2],
SupportedGroupsExtension)
self.assertIsInstance(ch.extensions[3],
ECPointFormatsExtension)
self.assertEqual(ch.extensions[4].extType,
ExtensionType.session_ticket)
# bug in tlslite-ng, removes NPN extensions from provided extensions
#self.assertIsInstance(ch.extensions[5],
# NPNExtension)
self.assertEqual(ch.extensions[5].extType,
ExtensionType.alpn)
self.assertEqual(ch.extensions[6].extType,
ExtensionType.status_request)
self.assertIsInstance(ch.extensions[7],
SignatureAlgorithmsExtension)
self.assertEqual(ch.compression_methods, [0])
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,76 @@
# Copyright (c) 2015 Hubert Kario
# Released under Mozilla Public License Version 2.0
try:
import unittest2 as unittest
except ImportError:
import unittest
from tlslite.utils.codec import Parser
from cscan.extensions import KeyShareExtension
from cscan.constants import GroupName
class TestKeyShareExtension(unittest.TestCase):
def test___init__(self):
ext = KeyShareExtension()
self.assertIsNotNone(ext)
def test_create(self):
ext = KeyShareExtension()
ext.create([(1, bytearray(b'\x12')),
(2, bytearray(b'\x33'))])
self.assertEqual(ext.client_shares, [(1, bytearray(b'\x12')),
(2, bytearray(b'\x33'))])
def test_write(self):
ext = KeyShareExtension()
ext.create([(GroupName.secp256r1, bytearray(b'\xff\xfa')),
(GroupName.ffdhe2048, bytearray(b'\xaf\xaa'))])
data = ext.write()
self.assertEqual(data, bytearray(
b'\x00\x2a\x00\x0d'
b'\x00\x0b'
b'\x00\x17\x02\xff\xfa'
b'\x01\x00\x00\x02\xaf\xaa'))
def test_write_with_no_data(self):
ext = KeyShareExtension()
data = ext.write()
self.assertEqual(data, bytearray(b'\x00\x2a\x00\x00'))
def test_parse(self):
parser = Parser(bytearray(
#b'\x00\x2a\x00\x0d'
b'\x00\x0b'
b'\x00\x17\x02\xff\xfa'
b'\x01\x00\x00\x02\xaf\xaa'))
ext = KeyShareExtension()
ext.parse(parser)
self.assertEqual(ext.client_shares,
[(GroupName.secp256r1, bytearray(b'\xff\xfa')),
(GroupName.ffdhe2048, bytearray(b'\xaf\xaa'))])
def test_parse_with_no_data(self):
parser = Parser(bytearray())
ext = KeyShareExtension()
ext.parse(parser)
self.assertIsNone(ext.client_shares)
def test___repr__(self):
ext = KeyShareExtension()
ext.create([(1, bytearray(b'\xff'))])
self.assertEqual("KeyShareExtension([(1, bytearray(b\'\\xff\'))])",
repr(ext))