mirror of
https://github.com/mozilla/cipherscan.git
synced 2024-11-22 06:13:42 +01:00
Merge pull request #128 from tomato42/intolerance-tests
TLS version (in)tolerance scanner
This commit is contained in:
commit
b1d37bf26d
8
.gitignore
vendored
8
.gitignore
vendored
@ -1,2 +1,10 @@
|
|||||||
mozilla/*
|
mozilla/*
|
||||||
top1m/results/*
|
top1m/results/*
|
||||||
|
tlslite
|
||||||
|
.tlslite-ng
|
||||||
|
*/__pycache__
|
||||||
|
*.pyc
|
||||||
|
.coverage
|
||||||
|
.python-ecdsa
|
||||||
|
ecdsa
|
||||||
|
tlslite
|
||||||
|
33
cipherscan
33
cipherscan
@ -980,6 +980,8 @@ display_results_in_terminal() {
|
|||||||
done | sort
|
done | sort
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
echo "$cscan_tests"
|
||||||
}
|
}
|
||||||
|
|
||||||
display_results_in_json() {
|
display_results_in_json() {
|
||||||
@ -1080,7 +1082,12 @@ display_results_in_json() {
|
|||||||
echo -n "}"
|
echo -n "}"
|
||||||
ctr=$((ctr+1))
|
ctr=$((ctr+1))
|
||||||
done
|
done
|
||||||
echo '}}'
|
echo -n '}'
|
||||||
|
if [[ -n $cscan_tests ]]; then
|
||||||
|
echo -n ',"intolerancies":'
|
||||||
|
echo -n "$cscan_tests"
|
||||||
|
fi
|
||||||
|
echo '}'
|
||||||
}
|
}
|
||||||
|
|
||||||
test_serverside_ordering() {
|
test_serverside_ordering() {
|
||||||
@ -1550,6 +1557,24 @@ test_tls_tolerance() {
|
|||||||
tls_tolerance['small-SSLv3']="True $current_protocol $current_cipher $current_trusted"
|
tls_tolerance['small-SSLv3']="True $current_protocol $current_cipher $current_trusted"
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
# finally run the Python based test to perform more precise scan
|
||||||
|
options=()
|
||||||
|
options+=("$DIRNAMEPATH/cscan.sh")
|
||||||
|
if [[ "$OUTPUTFORMAT" == "json" ]]; then
|
||||||
|
options+=(-j)
|
||||||
|
else
|
||||||
|
if [[ $VERBOSE != 0 ]]; then
|
||||||
|
options+=("-v" "--no-header")
|
||||||
|
else
|
||||||
|
options+=("--no-header")
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
options+=("$TARGET")
|
||||||
|
if [[ -n $sni_target ]]; then
|
||||||
|
options+=("$sni_target")
|
||||||
|
fi
|
||||||
|
cscan_tests="$(${options[*]})"
|
||||||
}
|
}
|
||||||
|
|
||||||
test_kex_sigalgs() {
|
test_kex_sigalgs() {
|
||||||
@ -2101,6 +2126,11 @@ if [[ $VERBOSE != 0 ]] ; then
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
SCLIENTARGS="${PARAMS[*]}"
|
SCLIENTARGS="${PARAMS[*]}"
|
||||||
|
# we need the SNI for cscan, so save it if it was provided through
|
||||||
|
# OpenSSL options
|
||||||
|
if [[ $SCLIENTARGS =~ servername[\ ]*([^\ ]*)[\ ]* ]]; then
|
||||||
|
sni_target="${BASH_REMATCH[1]}"
|
||||||
|
fi
|
||||||
# only append the SNI:
|
# only append the SNI:
|
||||||
# if the target is a hostname by validating the tld
|
# if the target is a hostname by validating the tld
|
||||||
# if -servername was not supplied by the user
|
# if -servername was not supplied by the user
|
||||||
@ -2109,6 +2139,7 @@ if [[ $SNI == "True" && ! $SCLIENTARGS =~ servername ]]; then
|
|||||||
SCLIENTARGS="$SCLIENTARGS -servername $sni_target"
|
SCLIENTARGS="$SCLIENTARGS -servername $sni_target"
|
||||||
else
|
else
|
||||||
echo "Warning: target is not a FQDN. SNI was disabled. Use a FQDN or '-servername <fqdn>'" 1>&2
|
echo "Warning: target is not a FQDN. SNI was disabled. Use a FQDN or '-servername <fqdn>'" 1>&2
|
||||||
|
sni_target=''
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
debug "sclientargs: $SCLIENTARGS"
|
debug "sclientargs: $SCLIENTARGS"
|
||||||
|
265
cscan.py
Normal file
265
cscan.py
Normal file
@ -0,0 +1,265 @@
|
|||||||
|
# Copyright 2016(c) Hubert Kario
|
||||||
|
# This work is released under the Mozilla Public License Version 2.0
|
||||||
|
"""tlslite-ng based server configuration (and bug) scanner."""
|
||||||
|
|
||||||
|
from __future__ import print_function
|
||||||
|
from tlslite.messages import ClientHello, ServerHello, ServerHelloDone, Alert
|
||||||
|
from tlslite.constants import CipherSuite, \
|
||||||
|
AlertLevel
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import getopt
|
||||||
|
import itertools
|
||||||
|
|
||||||
|
from cscan.scanner import Scanner
|
||||||
|
from cscan.config import Firefox_42
|
||||||
|
from cscan.modifiers import set_hello_version
|
||||||
|
|
||||||
|
|
||||||
|
def scan_with_config(host, port, conf, hostname, __sentry=None, __cache={}):
|
||||||
|
"""Connect to server and return set of exchanged messages."""
|
||||||
|
assert __sentry is None
|
||||||
|
key = (host, port, conf, hostname)
|
||||||
|
if key in __cache:
|
||||||
|
if verbose and not json_out:
|
||||||
|
print(":", end='')
|
||||||
|
return __cache[key]
|
||||||
|
|
||||||
|
scanner = Scanner(conf, host, port, hostname)
|
||||||
|
ret = scanner.scan()
|
||||||
|
__cache[key] = ret
|
||||||
|
if verbose and not json_out:
|
||||||
|
print(".", end='')
|
||||||
|
sys.stdout.flush()
|
||||||
|
return ret
|
||||||
|
|
||||||
|
|
||||||
|
def simple_inspector(result):
|
||||||
|
"""
|
||||||
|
Perform simple check to see if connection was successful.
|
||||||
|
|
||||||
|
Returns True is connection was successful, server replied with
|
||||||
|
ServerHello and ServerHelloDone messages, and the cipher selected
|
||||||
|
was present in ciphers advertised by client, False otherwise
|
||||||
|
"""
|
||||||
|
if any(isinstance(x, ServerHelloDone) for x in result):
|
||||||
|
ch = next((x for x in result if isinstance(x, ClientHello)), None)
|
||||||
|
sh = next((x for x in result if isinstance(x, ServerHello)), None)
|
||||||
|
if ch and sh:
|
||||||
|
if sh.cipher_suite not in ch.cipher_suites:
|
||||||
|
# FAILURE cipher suite mismatch
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
# incomplete response or error
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def verbose_inspector(desc, result):
|
||||||
|
"""Describe the connection result in human-readable form."""
|
||||||
|
ret = "{0}:".format(desc)
|
||||||
|
if any(isinstance(x, ServerHelloDone) for x in result):
|
||||||
|
ch = next((x for x in result if isinstance(x, ClientHello)), None)
|
||||||
|
sh = next((x for x in result if isinstance(x, ServerHello)), None)
|
||||||
|
if sh and ch:
|
||||||
|
if sh.cipher_suite not in ch.cipher_suites:
|
||||||
|
ret += " FAILURE cipher suite mismatch"
|
||||||
|
return ret
|
||||||
|
name = CipherSuite.ietfNames[sh.cipher_suite] \
|
||||||
|
if sh.cipher_suite in CipherSuite.ietfNames \
|
||||||
|
else hex(sh.cipher_suite)
|
||||||
|
ret += " ok: {0}, {1}".format(sh.server_version,
|
||||||
|
name)
|
||||||
|
return ret
|
||||||
|
ret += " FAILURE "
|
||||||
|
errors = []
|
||||||
|
for msg in result:
|
||||||
|
if isinstance(msg, ClientHello):
|
||||||
|
continue
|
||||||
|
# check if returned message supports custom formatting
|
||||||
|
if msg.__class__.__format__ is not object.__format__:
|
||||||
|
errors += ["{:vxm}".format(msg)]
|
||||||
|
else:
|
||||||
|
errors += [repr(msg)]
|
||||||
|
# skip printing close errors after fatal alerts, they are expected
|
||||||
|
if isinstance(msg, Alert) and msg.level == AlertLevel.fatal:
|
||||||
|
break
|
||||||
|
ret += "\n".join(errors)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
configs = {}
|
||||||
|
|
||||||
|
|
||||||
|
def load_configs():
|
||||||
|
"""Load known client configurations for later use in scanning."""
|
||||||
|
base_configs = [Firefox_42]
|
||||||
|
for conf in base_configs:
|
||||||
|
for version in ((3, 1), (3, 2), (3, 3), (3, 4), (3, 5), (3, 254)):
|
||||||
|
if conf().version != version:
|
||||||
|
# just changed version
|
||||||
|
gen = set_hello_version(conf(), version)
|
||||||
|
if gen.record_version > version:
|
||||||
|
gen.record_version = version
|
||||||
|
configs[gen.name] = gen
|
||||||
|
|
||||||
|
# Firefox 42 configs
|
||||||
|
gen = Firefox_42()
|
||||||
|
configs[gen.name] = gen
|
||||||
|
|
||||||
|
|
||||||
|
def scan_TLS_intolerancies(host, port, hostname):
|
||||||
|
"""Look for intolerancies (version, extensions, ...) in a TLS server."""
|
||||||
|
results = {}
|
||||||
|
|
||||||
|
def result_iterator(predicate):
|
||||||
|
"""
|
||||||
|
Selecting iterator over cached results.
|
||||||
|
|
||||||
|
Looks for matching result from already performed scans
|
||||||
|
"""
|
||||||
|
return (not simple_inspector(results[name]) for name in results
|
||||||
|
if predicate(configs[name]))
|
||||||
|
|
||||||
|
def result_cache(name, conf):
|
||||||
|
"""Perform scan if config is not in results, caches result."""
|
||||||
|
return results[name] if name in results \
|
||||||
|
else results.setdefault(name, scan_with_config(host, port, conf,
|
||||||
|
hostname))
|
||||||
|
|
||||||
|
def conf_iterator(predicate):
|
||||||
|
"""
|
||||||
|
Caching, selecting iterator over configs.
|
||||||
|
|
||||||
|
Returns an iterator that will go over configs that match the provided
|
||||||
|
predicate (a function that returns true or false depending if given
|
||||||
|
config is ok for test at hand) while saving the results to the
|
||||||
|
cache/verbose `results` log/dictionary
|
||||||
|
|
||||||
|
The iterator returns False for every connection that succeeded
|
||||||
|
(meaning the server is NOT intolerant to config and True to mean
|
||||||
|
that server IS intolerant to config.
|
||||||
|
"""
|
||||||
|
scan_iter = (not simple_inspector(result_cache(name, conf))
|
||||||
|
for name, conf in configs.items()
|
||||||
|
if predicate(conf))
|
||||||
|
return itertools.chain(result_iterator(predicate), scan_iter)
|
||||||
|
|
||||||
|
host_up = not all(conf_iterator(lambda conf: True))
|
||||||
|
|
||||||
|
intolerancies = {}
|
||||||
|
if not host_up:
|
||||||
|
if json_out:
|
||||||
|
print(json.dumps(intolerancies))
|
||||||
|
else:
|
||||||
|
print("Host does not seem to support SSL or TLS protocol")
|
||||||
|
return
|
||||||
|
|
||||||
|
intolerancies["SSL 3.254"] = all(conf_iterator(lambda conf:
|
||||||
|
conf.version == (3, 254)))
|
||||||
|
intolerancies["TLS 1.4"] = all(conf_iterator(lambda conf:
|
||||||
|
conf.version == (3, 5)))
|
||||||
|
intolerancies["TLS 1.3"] = all(conf_iterator(lambda conf:
|
||||||
|
conf.version == (3, 4)))
|
||||||
|
intolerancies["TLS 1.2"] = all(conf_iterator(lambda conf:
|
||||||
|
conf.version == (3, 3)))
|
||||||
|
intolerancies["TLS 1.1"] = all(conf_iterator(lambda conf:
|
||||||
|
conf.version == (3, 2)))
|
||||||
|
intolerancies["TLS 1.0"] = all(conf_iterator(lambda conf:
|
||||||
|
conf.version == (3, 1)))
|
||||||
|
|
||||||
|
if json_out:
|
||||||
|
print(json.dumps(intolerancies))
|
||||||
|
else:
|
||||||
|
if not no_header:
|
||||||
|
if verbose:
|
||||||
|
print()
|
||||||
|
print("Host {0}:{1} scan complete".format(host, port))
|
||||||
|
if hostname:
|
||||||
|
print("SNI hostname used: {0}".format(hostname))
|
||||||
|
if verbose:
|
||||||
|
print()
|
||||||
|
print("Individual probe results:")
|
||||||
|
for desc, ret in sorted(results.items()):
|
||||||
|
print(verbose_inspector(desc, ret))
|
||||||
|
|
||||||
|
print()
|
||||||
|
print("Intolerance to:")
|
||||||
|
for intolerance, value in sorted(intolerancies.items()):
|
||||||
|
print(" {0:20}: {1}".format(intolerance,
|
||||||
|
"PRESENT" if value else "absent"))
|
||||||
|
|
||||||
|
|
||||||
|
def single_probe(name):
|
||||||
|
"""Run a single probe against a server, print result."""
|
||||||
|
print(verbose_inspector(name, scan_with_config(host, port,
|
||||||
|
configs[name], hostname)))
|
||||||
|
|
||||||
|
|
||||||
|
def usage():
|
||||||
|
"""Print usage information."""
|
||||||
|
print("./cscan.py [ARGUMENTS] host[:port] [SNI-HOST-NAME]")
|
||||||
|
print()
|
||||||
|
print("-l, --list List probe names")
|
||||||
|
print("-p name, --probe Run just a single probe")
|
||||||
|
print("-j, --json Output in JSON format")
|
||||||
|
print("-v, --verbose Use verbose output")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
opts, args = getopt.getopt(sys.argv[1:],
|
||||||
|
"jvhlp:",
|
||||||
|
["json", "verbose", "help", "list",
|
||||||
|
"probe=", "no-header"])
|
||||||
|
except getopt.GetoptError as err:
|
||||||
|
print(err)
|
||||||
|
usage()
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
json_out = False
|
||||||
|
verbose = False
|
||||||
|
list_probes = False
|
||||||
|
run_probe = None
|
||||||
|
no_header = False
|
||||||
|
|
||||||
|
for opt, arg in opts:
|
||||||
|
if opt in ('-j', '--json'):
|
||||||
|
json_out = True
|
||||||
|
elif opt in ('-v', '--verbose'):
|
||||||
|
verbose = True
|
||||||
|
elif opt in ('-h', '--help'):
|
||||||
|
usage()
|
||||||
|
sys.exit(0)
|
||||||
|
elif opt in ('-l', '--list'):
|
||||||
|
list_probes = True
|
||||||
|
elif opt in ('-p', '--probe'):
|
||||||
|
run_probe = arg
|
||||||
|
elif opt in ('--no-header', ):
|
||||||
|
no_header = True
|
||||||
|
else:
|
||||||
|
raise AssertionError("Unknown option {0}".format(opt))
|
||||||
|
|
||||||
|
if len(args) > 2:
|
||||||
|
print("Too many arguments")
|
||||||
|
usage()
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
load_configs()
|
||||||
|
|
||||||
|
if list_probes:
|
||||||
|
for desc, ret in sorted(configs.items()):
|
||||||
|
print("{0}: {1}".format(desc, ret.__doc__))
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
hostname = None
|
||||||
|
if len(args) == 2:
|
||||||
|
hostname = args[1]
|
||||||
|
hostaddr = args[0].split(":")
|
||||||
|
if len(hostaddr) > 1:
|
||||||
|
host, port = hostaddr
|
||||||
|
else:
|
||||||
|
host = hostaddr[0]
|
||||||
|
port = 443
|
||||||
|
|
||||||
|
if run_probe:
|
||||||
|
single_probe(run_probe)
|
||||||
|
else:
|
||||||
|
scan_TLS_intolerancies(host, port, hostname)
|
26
cscan.sh
Executable file
26
cscan.sh
Executable file
@ -0,0 +1,26 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
pushd "$(dirname ${BASH_SOURCE[0]})" > /dev/null
|
||||||
|
if [ ! -d ./tlslite ]; then
|
||||||
|
git clone --depth=1 https://github.com/tomato42/tlslite-ng.git .tlslite-ng
|
||||||
|
ln -s .tlslite-ng/tlslite tlslite
|
||||||
|
fi
|
||||||
|
if [ ! -d ./ecdsa ]; then
|
||||||
|
git clone --depth=1 https://github.com/warner/python-ecdsa.git .python-ecdsa
|
||||||
|
ln -s .python-ecdsa/ecdsa ecdsa
|
||||||
|
fi
|
||||||
|
|
||||||
|
# update the code if it is running in interactive terminal
|
||||||
|
#if [[ -t 1 ]]; then
|
||||||
|
if [[ $UPDATE ]]; then
|
||||||
|
pushd .tlslite-ng >/dev/null
|
||||||
|
git pull origin master --quiet
|
||||||
|
popd >/dev/null
|
||||||
|
pushd .python-ecdsa >/dev/null
|
||||||
|
git pull origin master --quiet
|
||||||
|
popd >/dev/null
|
||||||
|
fi
|
||||||
|
|
||||||
|
PYTHONPATH=. python cscan.py "$@"
|
||||||
|
ret=$?
|
||||||
|
popd > /dev/null
|
||||||
|
exit $ret
|
0
cscan/__init__.py
Normal file
0
cscan/__init__.py
Normal file
134
cscan/config.py
Normal file
134
cscan/config.py
Normal file
@ -0,0 +1,134 @@
|
|||||||
|
# Copyright (c) 2016 Hubert Kario <hkario@redhat.com>
|
||||||
|
# Released under Mozilla Public License Version 2.0
|
||||||
|
|
||||||
|
"""Typical Client Hello messages sent by different clients."""
|
||||||
|
|
||||||
|
import random
|
||||||
|
from tlslite.messages import ClientHello
|
||||||
|
from tlslite.constants import \
|
||||||
|
ECPointFormat, HashAlgorithm, SignatureAlgorithm
|
||||||
|
from tlslite.extensions import SNIExtension, SupportedGroupsExtension, \
|
||||||
|
TLSExtension, SignatureAlgorithmsExtension, NPNExtension, \
|
||||||
|
ECPointFormatsExtension
|
||||||
|
from tlslite.utils.cryptomath import numberToByteArray
|
||||||
|
from .constants import CipherSuite, ExtensionType, GroupName
|
||||||
|
|
||||||
|
|
||||||
|
class HelloConfig(object):
|
||||||
|
"""Base object for all Client Hello configurations."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Initialize object with default settings."""
|
||||||
|
self._name = None
|
||||||
|
self.modifications = []
|
||||||
|
self.callbacks = []
|
||||||
|
self.version = (3, 3)
|
||||||
|
self.record_version = (3, 0)
|
||||||
|
self.ciphers = []
|
||||||
|
self.extensions = None
|
||||||
|
self.random = None
|
||||||
|
self.session_id = bytearray(0)
|
||||||
|
self.compression_methods = [0]
|
||||||
|
self.ssl2 = False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
"""Return the name of config with all the modifications applied."""
|
||||||
|
if self.modifications:
|
||||||
|
return "{0} ({1})".format(self._name,
|
||||||
|
", ".join(self.modifications))
|
||||||
|
else:
|
||||||
|
return self._name
|
||||||
|
|
||||||
|
@name.setter
|
||||||
|
def name(self, value):
|
||||||
|
"""Set the base name of the configuration."""
|
||||||
|
self._name = value
|
||||||
|
|
||||||
|
def __call__(self, hostname):
|
||||||
|
"""Generate a client hello object, use hostname in SNI extension."""
|
||||||
|
# SNI is special in that we don't want to send it if it is empty
|
||||||
|
if self.extensions:
|
||||||
|
sni = next((x for x in self.extensions
|
||||||
|
if isinstance(x, SNIExtension)),
|
||||||
|
None)
|
||||||
|
if sni:
|
||||||
|
if hostname is not None:
|
||||||
|
if sni.serverNames is None:
|
||||||
|
sni.serverNames = []
|
||||||
|
sni.hostNames = [hostname]
|
||||||
|
else:
|
||||||
|
# but if we were not provided with a host name, we want
|
||||||
|
# to remove empty extension
|
||||||
|
if sni.serverNames is None:
|
||||||
|
self.extensions = [x for x in self.extensions
|
||||||
|
if not isinstance(x, SNIExtension)]
|
||||||
|
|
||||||
|
if self.random:
|
||||||
|
rand = self.random
|
||||||
|
else:
|
||||||
|
# we're not doing any crypto with it, just need "something"
|
||||||
|
# TODO: place unix time at the beginning
|
||||||
|
rand = numberToByteArray(random.getrandbits(256), 32)
|
||||||
|
|
||||||
|
ch = ClientHello(self.ssl2).create(self.version, rand, self.session_id,
|
||||||
|
self.ciphers,
|
||||||
|
extensions=self.extensions)
|
||||||
|
ch.compression_methods = self.compression_methods
|
||||||
|
for cb in self.callbacks:
|
||||||
|
ch = cb(ch)
|
||||||
|
return ch
|
||||||
|
|
||||||
|
|
||||||
|
class Firefox_42(HelloConfig):
|
||||||
|
"""Create Client Hello like Firefox 42."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Set the configuration to Firefox 42."""
|
||||||
|
super(Firefox_42, self).__init__()
|
||||||
|
self._name = "Firefox 42"
|
||||||
|
self.version = (3, 3)
|
||||||
|
self.record_version = (3, 1)
|
||||||
|
self.ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
||||||
|
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
||||||
|
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
|
||||||
|
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
|
||||||
|
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
|
||||||
|
CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
|
||||||
|
CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
|
||||||
|
CipherSuite.TLS_DHE_RSA_WITH_AES_256_CBC_SHA,
|
||||||
|
CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
|
||||||
|
CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
|
||||||
|
CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA]
|
||||||
|
ext = self.extensions = []
|
||||||
|
ext.append(SNIExtension())
|
||||||
|
ext.append(TLSExtension(extType=ExtensionType.renegotiation_info)
|
||||||
|
.create(bytearray(1)))
|
||||||
|
ext.append(SupportedGroupsExtension().create([GroupName.secp256r1,
|
||||||
|
GroupName.secp384r1,
|
||||||
|
GroupName.secp521r1]))
|
||||||
|
ext.append(ECPointFormatsExtension()
|
||||||
|
.create([ECPointFormat.uncompressed]))
|
||||||
|
ext.append(TLSExtension(extType=ExtensionType.session_ticket))
|
||||||
|
ext.append(NPNExtension())
|
||||||
|
ext.append(TLSExtension(extType=ExtensionType.alpn)
|
||||||
|
.create(bytearray(b'\x00\x15' +
|
||||||
|
b'\x02' + b'h2' +
|
||||||
|
b'\x08' + b'spdy/3.1' +
|
||||||
|
b'\x08' + b'http/1.1')))
|
||||||
|
ext.append(TLSExtension(extType=ExtensionType.status_request)
|
||||||
|
.create(bytearray(b'\x01' +
|
||||||
|
b'\x00\x00' +
|
||||||
|
b'\x00\x00')))
|
||||||
|
sig_algs = []
|
||||||
|
for alg in ['sha256', 'sha384', 'sha512', 'sha1']:
|
||||||
|
sig_algs.append((getattr(HashAlgorithm, alg),
|
||||||
|
SignatureAlgorithm.rsa))
|
||||||
|
for alg in ['sha256', 'sha384', 'sha512', 'sha1']:
|
||||||
|
sig_algs.append((getattr(HashAlgorithm, alg),
|
||||||
|
SignatureAlgorithm.ecdsa))
|
||||||
|
for alg in ['sha256', 'sha1']:
|
||||||
|
sig_algs.append((getattr(HashAlgorithm, alg),
|
||||||
|
SignatureAlgorithm.dsa))
|
||||||
|
ext.append(SignatureAlgorithmsExtension()
|
||||||
|
.create(sig_algs))
|
165
cscan/constants.py
Normal file
165
cscan/constants.py
Normal file
@ -0,0 +1,165 @@
|
|||||||
|
# Copyright 2016(c) Hubert Kario
|
||||||
|
# This work is released under the Mozilla Public License Version 2.0
|
||||||
|
"""Extend the tlslite-ng constants with values it does not support."""
|
||||||
|
|
||||||
|
import tlslite.constants
|
||||||
|
|
||||||
|
from tlslite.constants import CipherSuite
|
||||||
|
|
||||||
|
CipherSuite.ecdheEcdsaSuites = []
|
||||||
|
|
||||||
|
# RFC 5289
|
||||||
|
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384 = 0xC02C
|
||||||
|
CipherSuite.ietfNames[0xC02C] = 'TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384'
|
||||||
|
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
|
||||||
|
TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384)
|
||||||
|
|
||||||
|
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 = 0xC02B
|
||||||
|
CipherSuite.ietfNames[0xC02B] = 'TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256'
|
||||||
|
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
|
||||||
|
TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256)
|
||||||
|
|
||||||
|
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384 = 0xC024
|
||||||
|
CipherSuite.ietfNames[0xC024] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384'
|
||||||
|
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
|
||||||
|
TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384)
|
||||||
|
|
||||||
|
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256 = 0xC023
|
||||||
|
CipherSuite.ietfNames[0xC023] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256'
|
||||||
|
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
|
||||||
|
TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256)
|
||||||
|
|
||||||
|
# RFC 4492
|
||||||
|
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA = 0xC00A
|
||||||
|
CipherSuite.ietfNames[0xC00A] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA'
|
||||||
|
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
|
||||||
|
TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA)
|
||||||
|
|
||||||
|
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA = 0xC009
|
||||||
|
CipherSuite.ietfNames[0xC009] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA'
|
||||||
|
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
|
||||||
|
TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA)
|
||||||
|
|
||||||
|
# RFC 7251
|
||||||
|
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CCM = 0xC0Ad
|
||||||
|
CipherSuite.ietfNames[0xC0AD] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CCM'
|
||||||
|
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
|
||||||
|
TLS_ECDHE_ECDSA_WITH_AES_256_CCM)
|
||||||
|
|
||||||
|
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM = 0xC0AC
|
||||||
|
CipherSuite.ietfNames[0xC0AC] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CCM'
|
||||||
|
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
|
||||||
|
TLS_ECDHE_ECDSA_WITH_AES_128_CCM)
|
||||||
|
|
||||||
|
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8 = 0xC0AF
|
||||||
|
CipherSuite.ietfNames[0xC0AF] = 'TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8'
|
||||||
|
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
|
||||||
|
TLS_ECDHE_ECDSA_WITH_AES_256_CCM_8)
|
||||||
|
|
||||||
|
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8 = 0xC0AE
|
||||||
|
CipherSuite.ietfNames[0xC0AE] = 'TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8'
|
||||||
|
CipherSuite.ecdheEcdsaSuites.append(CipherSuite.
|
||||||
|
TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8)
|
||||||
|
|
||||||
|
CipherSuite.ecdhAllSuites.extend(CipherSuite.ecdheEcdsaSuites)
|
||||||
|
CipherSuite.certAllSuites.extend(CipherSuite.ecdheEcdsaSuites)
|
||||||
|
|
||||||
|
# obsolete stuff
|
||||||
|
CipherSuite.TLS_RSA_WITH_DES_CBC_SHA = 0x0009
|
||||||
|
CipherSuite.ietfNames[0x0009] = 'TLS_RSA_WITH_DES_CBC_SHA'
|
||||||
|
|
||||||
|
CipherSuite.TLS_RSA_EXPORT1024_WITH_RC4_56_SHA = 0x0064
|
||||||
|
CipherSuite.ietfNames[0x0064] = 'TLS_RSA_EXPORT1024_WITH_RC4_56_SHA'
|
||||||
|
CipherSuite.TLS_RSA_EXPORT1024_WITH_DES_CBC_SHA = 0x0062
|
||||||
|
CipherSuite.ietfNames[0x0062] = 'TLS_RSA_EXPORT1024_WITH_DES_CBC_SHA'
|
||||||
|
CipherSuite.TLS_RSA_EXPORT_WITH_RC4_40_MD5 = 0x0003
|
||||||
|
CipherSuite.ietfNames[0x0003] = 'TLS_RSA_EXPORT_WITH_RC4_40_MD5'
|
||||||
|
CipherSuite.TLS_RSA_EXPORT_WITH_RC2_CBC_40_MD5 = 0x0006
|
||||||
|
CipherSuite.ietfNames[0x0006] = 'TLS_RSA_EXPORT_WITH_RC2_CBC_40_MD5'
|
||||||
|
|
||||||
|
# DSS
|
||||||
|
CipherSuite.dheDssSuites = []
|
||||||
|
|
||||||
|
CipherSuite.TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA = 0x0013
|
||||||
|
CipherSuite.ietfNames[0x0013] = 'TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA'
|
||||||
|
CipherSuite.dheDssSuites.append(CipherSuite.
|
||||||
|
TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA)
|
||||||
|
|
||||||
|
CipherSuite.TLS_DHE_DSS_WITH_DES_CBC_SHA = 0x0012
|
||||||
|
CipherSuite.ietfNames[0x0012] = 'TLS_DHE_DSS_WITH_DES_CBC_SHA'
|
||||||
|
CipherSuite.dheDssSuites.append(CipherSuite.
|
||||||
|
TLS_DHE_DSS_WITH_DES_CBC_SHA)
|
||||||
|
|
||||||
|
CipherSuite.TLS_DHE_DSS_EXPORT1024_WITH_DES_CBC_SHA = 0x0063
|
||||||
|
CipherSuite.ietfNames[0x0063] = 'TLS_DHE_DSS_EXPORT1024_WITH_DES_CBC_SHA'
|
||||||
|
CipherSuite.dheDssSuites.append(CipherSuite.
|
||||||
|
TLS_DHE_DSS_EXPORT1024_WITH_DES_CBC_SHA)
|
||||||
|
|
||||||
|
CipherSuite.TLS_DHE_DSS_WITH_AES_128_CBC_SHA = 0x0032
|
||||||
|
CipherSuite.ietfNames[0x0032] = 'TLS_DHE_DSS_WITH_AES_128_CBC_SHA'
|
||||||
|
CipherSuite.dheDssSuites.append(CipherSuite.
|
||||||
|
TLS_DHE_DSS_WITH_AES_128_CBC_SHA)
|
||||||
|
|
||||||
|
CipherSuite.TLS_DHE_DSS_WITH_AES_256_CBC_SHA = 0x0038
|
||||||
|
CipherSuite.ietfNames[0x0038] = 'TLS_DHE_DSS_WITH_AES_256_CBC_SHA'
|
||||||
|
CipherSuite.dheDssSuites.append(CipherSuite.
|
||||||
|
TLS_DHE_DSS_WITH_AES_256_CBC_SHA)
|
||||||
|
|
||||||
|
CipherSuite.TLS_DHE_DSS_WITH_AES_128_CBC_SHA256 = 0x0040
|
||||||
|
CipherSuite.ietfNames[0x0040] = 'TLS_DHE_DSS_WITH_AES_128_CBC_SHA256'
|
||||||
|
CipherSuite.dheDssSuites.append(CipherSuite.
|
||||||
|
TLS_DHE_DSS_WITH_AES_128_CBC_SHA256)
|
||||||
|
|
||||||
|
CipherSuite.TLS_DHE_DSS_WITH_AES_256_CBC_SHA256 = 0x006a
|
||||||
|
CipherSuite.ietfNames[0x006a] = 'TLS_DHE_DSS_WITH_AES_256_CBC_SHA256'
|
||||||
|
CipherSuite.dheDssSuites.append(CipherSuite.
|
||||||
|
TLS_DHE_DSS_WITH_AES_256_CBC_SHA256)
|
||||||
|
|
||||||
|
|
||||||
|
class ExtensionType(tlslite.constants.ExtensionType):
|
||||||
|
"""Definitions of TLS extension IDs."""
|
||||||
|
|
||||||
|
status_request = 5
|
||||||
|
alpn = 16
|
||||||
|
session_ticket = 35
|
||||||
|
|
||||||
|
heartbeat = 15 # RFC 6520
|
||||||
|
status_request_v2 = 17 # RFC 6961
|
||||||
|
padding = 21 # RFC 7685
|
||||||
|
max_fragment_legth = 1 # RFC 6066
|
||||||
|
|
||||||
|
# From: Eric Rescorla <ekr at rtfm.com>
|
||||||
|
# Date: Mon, 7 Dec 2015 05:36:22 -0800
|
||||||
|
# [TLS] TLS 1.3 ServerConfiguration
|
||||||
|
early_data = 40
|
||||||
|
pre_shared_key = 41
|
||||||
|
key_share = 42
|
||||||
|
cookie = 43
|
||||||
|
|
||||||
|
|
||||||
|
class GroupName(tlslite.constants.GroupName):
|
||||||
|
"""ECDH and FFDH key exchange group names."""
|
||||||
|
|
||||||
|
allEC = list(tlslite.constants.GroupName.allEC)
|
||||||
|
allFF = list(tlslite.constants.GroupName.allFF)
|
||||||
|
|
||||||
|
ecdh_x25519 = 29
|
||||||
|
allEC.append(ecdh_x25519)
|
||||||
|
|
||||||
|
ecdh_x448 = 30
|
||||||
|
allEC.append(ecdh_x448)
|
||||||
|
|
||||||
|
eddsa_ed25519 = 31
|
||||||
|
allEC.append(eddsa_ed25519)
|
||||||
|
|
||||||
|
eddsa_ed448 = 32
|
||||||
|
allEC.append(eddsa_ed448)
|
||||||
|
|
||||||
|
all = allEC + allFF
|
||||||
|
|
||||||
|
|
||||||
|
class HandshakeType(tlslite.constants.HandshakeType):
|
||||||
|
"""Type of messages in Handshake protocol."""
|
||||||
|
|
||||||
|
certificate_status = 22
|
||||||
|
session_ticket = 4
|
198
cscan/extensions.py
Normal file
198
cscan/extensions.py
Normal file
@ -0,0 +1,198 @@
|
|||||||
|
# Copyright 2016(c) Hubert Kario
|
||||||
|
# This work is released under the Mozilla Public License Version 2.0
|
||||||
|
|
||||||
|
"""Extra TLS extensions."""
|
||||||
|
|
||||||
|
import tlslite.extensions
|
||||||
|
from tlslite.utils.codec import Writer
|
||||||
|
from tlslite.utils.compat import b2a_hex
|
||||||
|
from .constants import ExtensionType, GroupName
|
||||||
|
import .messages
|
||||||
|
|
||||||
|
# make TLSExtensions hashable (__eq__ is already defined in base class)
|
||||||
|
tlslite.extensions.TLSExtension.__hash__ = lambda self: hash(self.extType) ^ \
|
||||||
|
hash(bytes(self.extData))
|
||||||
|
|
||||||
|
|
||||||
|
class RenegotiationExtension(tlslite.extensions.TLSExtension):
|
||||||
|
"""Secure Renegotiation extension RFC 5746."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Initialize secure renegotiation extension."""
|
||||||
|
super(RenegotiationExtension, self).__init__(
|
||||||
|
extType=ExtensionType.renegotiation_info)
|
||||||
|
self.renegotiated_connection = None
|
||||||
|
|
||||||
|
def create(self, data):
|
||||||
|
"""Set the value of the Finished message."""
|
||||||
|
self.renegotiated_connection = data
|
||||||
|
|
||||||
|
@property
|
||||||
|
def extData(self):
|
||||||
|
"""Serialise the extension."""
|
||||||
|
if self.renegotiated_connection is None:
|
||||||
|
return bytearray(0)
|
||||||
|
|
||||||
|
writer = Writer()
|
||||||
|
writer.addVarSeq(self.renegotiated_connection, 1, 1)
|
||||||
|
return writer.bytes
|
||||||
|
|
||||||
|
def parse(self, parser):
|
||||||
|
"""Deserialise the extension from binary data."""
|
||||||
|
if parser.getRemainingLength() == 0:
|
||||||
|
self.renegotiated_connection = None
|
||||||
|
return
|
||||||
|
|
||||||
|
self.renegotiated_connection = parser.getVarBytes(1)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
"""Human readable representation of extension."""
|
||||||
|
return "RenegotiationExtension(renegotiated_connection={0!r})"\
|
||||||
|
.format(self.renegotiated_connection)
|
||||||
|
|
||||||
|
def __format__(self, formatstr):
|
||||||
|
"""Formatted representation of extension."""
|
||||||
|
data = messages.format_bytearray(self.renegotiated_connection,
|
||||||
|
formatstr)
|
||||||
|
return "RenegotiationExtension(renegotiated_connection={0})"\
|
||||||
|
.format(data)
|
||||||
|
|
||||||
|
tlslite.extensions.TLSExtension._universalExtensions[
|
||||||
|
ExtensionType.renegotiation_info] = RenegotiationExtension
|
||||||
|
|
||||||
|
|
||||||
|
class SessionTicketExtension(tlslite.extensions.TLSExtension):
|
||||||
|
"""Session Ticket extension (a.k.a. OCSP staple)."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Create Session Ticket extension."""
|
||||||
|
super(SessionTicketExtension, self).__init__(
|
||||||
|
extType=ExtensionType.session_ticket)
|
||||||
|
self.data = bytearray(0)
|
||||||
|
|
||||||
|
def parse(self, parser):
|
||||||
|
"""Deserialise the extension from binary data."""
|
||||||
|
self.data = parser.bytes
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __format__(self, formatstr):
|
||||||
|
"""Print extension data in human-readable form."""
|
||||||
|
data = messages.format_bytearray(self.data, formatstr)
|
||||||
|
return "SessionTicketExtension(data={0})".format(data)
|
||||||
|
|
||||||
|
tlslite.extensions.TLSExtension._universalExtensions[
|
||||||
|
ExtensionType.session_ticket] = SessionTicketExtension
|
||||||
|
|
||||||
|
|
||||||
|
class ServerStatusRequestExtension(tlslite.extensions.TLSExtension):
|
||||||
|
"""Server Status Request extension."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Create server status request extension."""
|
||||||
|
super(ServerStatusRequestExtension, self).__init__(
|
||||||
|
extType=ExtensionType.status_request)
|
||||||
|
|
||||||
|
def parse(self, parser):
|
||||||
|
"""Deserialise the extension from binary data."""
|
||||||
|
if parser.getRemainingLength() != 0:
|
||||||
|
raise SyntaxError() # FIXME
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
"""Human readable representation of the object."""
|
||||||
|
return "ServerStatusRequestExtension()"
|
||||||
|
|
||||||
|
tlslite.extensions.TLSExtension._serverExtensions[
|
||||||
|
ExtensionType.status_request] = ServerStatusRequestExtension
|
||||||
|
|
||||||
|
|
||||||
|
class KeyShareExtension(tlslite.extensions.TLSExtension):
|
||||||
|
"""TLS1.3 extension for handling key negotiation."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Create key share extension object."""
|
||||||
|
super(KeyShareExtension, self).__init__(
|
||||||
|
extType=ExtensionType.key_share)
|
||||||
|
self.client_shares = None
|
||||||
|
|
||||||
|
def create(self, shares):
|
||||||
|
"""
|
||||||
|
Set the list of key shares to send.
|
||||||
|
|
||||||
|
@type shares: list of tuples
|
||||||
|
@param shares: a list of tuples where the first element is a NamedGroup
|
||||||
|
ID while the second element in a tuple is an opaque bytearray encoding
|
||||||
|
of the key share.
|
||||||
|
"""
|
||||||
|
self.client_shares = shares
|
||||||
|
return self
|
||||||
|
|
||||||
|
@property
|
||||||
|
def extData(self):
|
||||||
|
"""Serialise the extension."""
|
||||||
|
if self.client_shares is None:
|
||||||
|
return bytearray(0)
|
||||||
|
|
||||||
|
writer = Writer()
|
||||||
|
for group_id, share in self.client_shares:
|
||||||
|
writer.add(group_id, 2)
|
||||||
|
if group_id in GroupName.allFF:
|
||||||
|
share_length_length = 2
|
||||||
|
else:
|
||||||
|
share_length_length = 1
|
||||||
|
writer.addVarSeq(share, 1, share_length_length)
|
||||||
|
ext_writer = Writer()
|
||||||
|
ext_writer.add(len(writer.bytes), 2)
|
||||||
|
ext_writer.bytes += writer.bytes
|
||||||
|
return ext_writer.bytes
|
||||||
|
|
||||||
|
def parse(self, parser):
|
||||||
|
"""Deserialise the extension."""
|
||||||
|
if parser.getRemainingLength() == 0:
|
||||||
|
self.client_shares = None
|
||||||
|
return
|
||||||
|
|
||||||
|
self.client_shares = []
|
||||||
|
|
||||||
|
parser.startLengthCheck(2)
|
||||||
|
while not parser.atLengthCheck():
|
||||||
|
group_id = parser.get(2)
|
||||||
|
if group_id in GroupName.allFF:
|
||||||
|
share_length_length = 2
|
||||||
|
else:
|
||||||
|
share_length_length = 1
|
||||||
|
share = parser.getVarBytes(share_length_length)
|
||||||
|
self.client_shares.append((group_id, share))
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
"""Human readble representation of extension."""
|
||||||
|
return "KeyShareExtension({0!r})".format(self.client_shares)
|
||||||
|
|
||||||
|
def __format__(self, formatstr):
|
||||||
|
"""Formattable representation of extension."""
|
||||||
|
if self.client_shares is None:
|
||||||
|
return "KeyShareExtension(None)"
|
||||||
|
|
||||||
|
verbose = ""
|
||||||
|
hexlify = False
|
||||||
|
if 'v' in formatstr:
|
||||||
|
verbose = "GroupName."
|
||||||
|
if 'h' in formatstr:
|
||||||
|
hexlify = True
|
||||||
|
|
||||||
|
shares = []
|
||||||
|
for group_id, share in self.client_shares:
|
||||||
|
if hexlify:
|
||||||
|
share = b2a_hex(share)
|
||||||
|
else:
|
||||||
|
share = repr(share)
|
||||||
|
shares += ["({0}{1}, {2})".format(verbose,
|
||||||
|
GroupName.toStr(group_id),
|
||||||
|
share)]
|
||||||
|
return "KeyShareExtension([" + ",".join(shares) + "])"
|
||||||
|
|
||||||
|
tlslite.extensions.TLSExtension._universalExtensions[
|
||||||
|
ExtensionType.key_share] = KeyShareExtension
|
264
cscan/messages.py
Normal file
264
cscan/messages.py
Normal file
@ -0,0 +1,264 @@
|
|||||||
|
# Copyright (c) 2016 Hubert Kario
|
||||||
|
# Released under Mozilla Public License 2.0
|
||||||
|
|
||||||
|
"""Extensions and modification of the tlslite-ng messages classes."""
|
||||||
|
|
||||||
|
import tlslite.messages as messages
|
||||||
|
from tlslite.utils.compat import b2a_hex
|
||||||
|
from tlslite.constants import ContentType, CertificateType, ECCurveType, \
|
||||||
|
HashAlgorithm, SignatureAlgorithm
|
||||||
|
from tlslite.x509certchain import X509CertChain
|
||||||
|
from tlslite.utils.cryptomath import secureHash
|
||||||
|
from .constants import HandshakeType, CipherSuite, GroupName
|
||||||
|
|
||||||
|
# gotta go fast
|
||||||
|
# comparing client hello's using ClientHello.write() is painfully slow
|
||||||
|
# monkey patch in faster compare methods
|
||||||
|
|
||||||
|
|
||||||
|
def __CH_eq_fun(self, other):
|
||||||
|
"""
|
||||||
|
Check if the other is equal to the object.
|
||||||
|
|
||||||
|
always returns false if other is not a ClientHello object
|
||||||
|
"""
|
||||||
|
if not isinstance(other, messages.ClientHello):
|
||||||
|
return False
|
||||||
|
|
||||||
|
return self.ssl2 == other.ssl2 and \
|
||||||
|
self.client_version == other.client_version and \
|
||||||
|
self.random == other.random and \
|
||||||
|
self.session_id == other.session_id and \
|
||||||
|
self.cipher_suites == other.cipher_suites and \
|
||||||
|
self.compression_methods == other.compression_methods and \
|
||||||
|
self.extensions == other.extensions
|
||||||
|
|
||||||
|
messages.ClientHello.__eq__ = __CH_eq_fun
|
||||||
|
|
||||||
|
|
||||||
|
def __CH_ne_fun(self, other):
|
||||||
|
"""
|
||||||
|
Check if the other is not equal to the object.
|
||||||
|
|
||||||
|
always returns true if other is not a ClientHello object
|
||||||
|
"""
|
||||||
|
return not self.__eq__(other)
|
||||||
|
|
||||||
|
messages.ClientHello.__ne__ = __CH_ne_fun
|
||||||
|
|
||||||
|
|
||||||
|
def format_bytearray(byte_array, formatstr):
|
||||||
|
"""Format method for bytearrays."""
|
||||||
|
if 'x' in formatstr:
|
||||||
|
return b2a_hex(byte_array)
|
||||||
|
else:
|
||||||
|
return repr(byte_array)
|
||||||
|
|
||||||
|
|
||||||
|
def format_array(array, formatstr):
|
||||||
|
"""Return string representation of array while formatting elements."""
|
||||||
|
if array is None:
|
||||||
|
return "None"
|
||||||
|
else:
|
||||||
|
str_array = []
|
||||||
|
for elem in array:
|
||||||
|
if elem.__class__.__format__ is not object.__format__:
|
||||||
|
str_array += ["{0:{1}}".format(elem, formatstr)]
|
||||||
|
else:
|
||||||
|
str_array += [repr(elem)]
|
||||||
|
return "[" + ", ".join(str_array) + "]"
|
||||||
|
|
||||||
|
|
||||||
|
class ServerHello(messages.ServerHello):
|
||||||
|
"""Class with enhanced human-readable serialisation."""
|
||||||
|
|
||||||
|
def __format__(self, formatstr):
|
||||||
|
"""Return human readable representation of the object."""
|
||||||
|
extensions = format_array(self.extensions, formatstr)
|
||||||
|
random = format_bytearray(self.random, formatstr)
|
||||||
|
session_id = format_bytearray(self.session_id, formatstr)
|
||||||
|
cipher_suite = CipherSuite.ietfNames.get(self.cipher_suite,
|
||||||
|
self.cipher_suite)
|
||||||
|
|
||||||
|
if 'v' in formatstr:
|
||||||
|
cipher_suite = "CipherSuite.{0}".format(cipher_suite)
|
||||||
|
|
||||||
|
# TODO cipher_suites (including verbose)
|
||||||
|
# TODO compression_method (including verbose)
|
||||||
|
return ("ServerHello(server_version=({0[0]}, {0[1]}), random={1}, "
|
||||||
|
"session_id={2!r}, cipher_suite={3}, compression_method={4}, "
|
||||||
|
"_tack_ext={5}, extensions={6})").format(
|
||||||
|
self.server_version, random, session_id,
|
||||||
|
cipher_suite, self.compression_method, self._tack_ext,
|
||||||
|
extensions)
|
||||||
|
|
||||||
|
|
||||||
|
class Certificate(messages.Certificate):
|
||||||
|
"""Class with more robust certificate parsing and serialisation."""
|
||||||
|
|
||||||
|
def parse(self, parser):
|
||||||
|
"""Deserialise the object from binary data."""
|
||||||
|
index = parser.index
|
||||||
|
try:
|
||||||
|
return super(Certificate, self).parse(parser)
|
||||||
|
except (AssertionError, SyntaxError):
|
||||||
|
pass
|
||||||
|
parser.index = index
|
||||||
|
parser.startLengthCheck(3)
|
||||||
|
if self.certificateType == CertificateType.x509:
|
||||||
|
chainLength = parser.get(3)
|
||||||
|
index = 0
|
||||||
|
certificate_list = []
|
||||||
|
while index != chainLength:
|
||||||
|
certBytes = parser.getVarBytes(3)
|
||||||
|
certificate_list.append(certBytes)
|
||||||
|
index += len(certBytes)+3
|
||||||
|
if certificate_list:
|
||||||
|
self.certChain = certificate_list
|
||||||
|
else:
|
||||||
|
raise AssertionError()
|
||||||
|
|
||||||
|
parser.stopLengthCheck()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __format__(self, formatstr):
|
||||||
|
"""Advanced formatting for messages."""
|
||||||
|
hexify = False
|
||||||
|
verbose = False
|
||||||
|
digest = False
|
||||||
|
if 'h' in formatstr:
|
||||||
|
hexify = True
|
||||||
|
if 'v' in formatstr:
|
||||||
|
verbose = True
|
||||||
|
if 'm' in formatstr:
|
||||||
|
digest = True
|
||||||
|
|
||||||
|
if self.certChain is None:
|
||||||
|
cert_list = None
|
||||||
|
else:
|
||||||
|
if isinstance(self.certChain, X509CertChain):
|
||||||
|
cert_list = [cert.bytes for cert in self.certChain.x509List]
|
||||||
|
else:
|
||||||
|
cert_list = self.certChain
|
||||||
|
|
||||||
|
if digest:
|
||||||
|
cert_list = "[" + ", ".join(b2a_hex(secureHash(cert, 'sha256'))
|
||||||
|
for cert in cert_list) + "]"
|
||||||
|
else:
|
||||||
|
cert_list = [repr(cert) for cert in cert_list]
|
||||||
|
|
||||||
|
return "Certificate({0})".format(cert_list)
|
||||||
|
|
||||||
|
|
||||||
|
class NewSessionTicket(messages.HandshakeMsg):
|
||||||
|
"""Class for handling the Session Tickets from RFC 5077."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Initilize new sesion ticket message object."""
|
||||||
|
super(NewSessionTicket, self).__init__(HandshakeType.session_ticket)
|
||||||
|
self.ticket_lifetime_hintt = 0
|
||||||
|
self.ticket = None
|
||||||
|
|
||||||
|
def parse(self, parser):
|
||||||
|
"""Parse the object from on-the-wire data."""
|
||||||
|
self.ticket_lifetime_hint = parser.get(4)
|
||||||
|
self.ticket = parser.getVarBytes(2)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __format__(self, formatstr):
|
||||||
|
"""Return human-readable representation of the object."""
|
||||||
|
ticket = format_bytearray(self.ticket, formatstr)
|
||||||
|
return "NewSessionTicket(ticket_lifetime_hint={0}, ticket={1})"\
|
||||||
|
.format(self.ticket_lifetime_hintt, ticket)
|
||||||
|
|
||||||
|
|
||||||
|
class CertificateStatus(messages.HandshakeMsg):
|
||||||
|
"""Class for handling the CertificateStatus OCSP staples from RFC 4366."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Create a certificate status message handling object."""
|
||||||
|
super(CertificateStatus, self).__init__(
|
||||||
|
HandshakeType.certificate_status)
|
||||||
|
self.status_type = 0
|
||||||
|
self.response = None
|
||||||
|
|
||||||
|
def parse(self, parser):
|
||||||
|
"""Deserialise certificate status message from binary data."""
|
||||||
|
parser.startLengthCheck(3)
|
||||||
|
self.status_type = parser.get(1)
|
||||||
|
if self.status_type == 1: # FIXME, create registry
|
||||||
|
self.response = parser.getVarBytes(3)
|
||||||
|
else:
|
||||||
|
raise SyntaxError() # FIXME, use sane-er type
|
||||||
|
parser.stopLengthCheck()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __format__(self, formatstr):
|
||||||
|
"""Return human-readable representation of certificate status."""
|
||||||
|
response = format_bytearray(self.response, formatstr)
|
||||||
|
return "CertificateStatus(status_type={0}, response={1})"\
|
||||||
|
.format(self.status_type, response)
|
||||||
|
|
||||||
|
|
||||||
|
class Message(messages.Message):
|
||||||
|
"""Message class with more robust formatting capability."""
|
||||||
|
|
||||||
|
def __format__(self, formatstr):
|
||||||
|
"""Advanced formatting for messages."""
|
||||||
|
hexify = False
|
||||||
|
verbose = ""
|
||||||
|
if 'h' in formatstr:
|
||||||
|
hexify = True
|
||||||
|
if 'v' in formatstr:
|
||||||
|
verbose = "ContentType."
|
||||||
|
|
||||||
|
if hexify:
|
||||||
|
data = b2a_hex(self.data)
|
||||||
|
else:
|
||||||
|
data = repr(self.data)
|
||||||
|
|
||||||
|
return "Message(contentType={0}{1}, data={2})"\
|
||||||
|
.format(verbose, ContentType.toStr(self.contentType), data)
|
||||||
|
|
||||||
|
|
||||||
|
class ServerKeyExchange(messages.ServerKeyExchange):
|
||||||
|
"""ServerKeyExchange class with more robust formatting capability."""
|
||||||
|
|
||||||
|
def parse(self, parser):
|
||||||
|
"""more robust parser for SKE"""
|
||||||
|
try:
|
||||||
|
super(ServerKeyExchange, self).parse(parser)
|
||||||
|
except AssertionError:
|
||||||
|
pass
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __format__(self, formatstr):
|
||||||
|
"""Return human-readable representation of the object."""
|
||||||
|
if 'v' in formatstr:
|
||||||
|
verbose = "CipherSuite."
|
||||||
|
else:
|
||||||
|
verbose = ""
|
||||||
|
|
||||||
|
ret = "ServerKeyExchange(cipherSuite={0}{1}, version={2}"\
|
||||||
|
.format(verbose, CipherSuite.ietfNames[self.cipherSuite],
|
||||||
|
self.version)
|
||||||
|
if self.srp_N:
|
||||||
|
ret += ", srp_N={0}, srp_g={1}, srp_s={2}, srp_B={3}"\
|
||||||
|
.format(self.srp_N, self.srp_g, self.srp_s, self.srp_B)
|
||||||
|
if self.dh_p:
|
||||||
|
ret += ", dh_p={0}, dh_g={1}, dh_Ys={2}"\
|
||||||
|
.format(self.dh_p, self.dh_g, self.dh_Ys)
|
||||||
|
if self.ecdh_Ys:
|
||||||
|
ecdh_Ys = format_bytearray(self.ecdh_Ys, formatstr)
|
||||||
|
ret += ", curve_type={0}, named_curve={1}, ecdh_Ys={2}"\
|
||||||
|
.format(ECCurveType.toStr(self.curve_type),
|
||||||
|
GroupName.toStr(self.named_curve), ecdh_Ys)
|
||||||
|
if self.signAlg:
|
||||||
|
ret += ", hashAlg={0}, signAlg={1}"\
|
||||||
|
.format(HashAlgorithm.toStr(self.hashAlg),
|
||||||
|
SignatureAlgorithm.toStr(self.signAlg))
|
||||||
|
if self.signature:
|
||||||
|
ret += ", signature={0}"\
|
||||||
|
.format(format_bytearray(self.signature, formatstr))
|
||||||
|
|
||||||
|
return ret + ")"
|
29
cscan/modifiers.py
Normal file
29
cscan/modifiers.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
# Copyright (c) 2016 Hubert Kario
|
||||||
|
# Released under Mozilla Public License 2.0
|
||||||
|
|
||||||
|
"""Methods for modifying the scan configurations on the fly."""
|
||||||
|
|
||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
|
proto_versions = {(3, 0): "SSLv3",
|
||||||
|
(3, 1): "TLSv1.0",
|
||||||
|
(3, 2): "TLSv1.1",
|
||||||
|
(3, 3): "TLSv1.2",
|
||||||
|
(3, 4): "TLSv1.3",
|
||||||
|
(3, 5): "TLSv1.4",
|
||||||
|
(3, 6): "TLSv1.5"}
|
||||||
|
|
||||||
|
|
||||||
|
def version_to_str(version):
|
||||||
|
"""Convert a version tuple to human-readable string."""
|
||||||
|
version_name = proto_versions.get(version, None)
|
||||||
|
if version_name is None:
|
||||||
|
version_name = "{0[0]}.{0[1]}".format(version)
|
||||||
|
return version_name
|
||||||
|
|
||||||
|
|
||||||
|
def set_hello_version(generator, version):
|
||||||
|
"""Set client hello version."""
|
||||||
|
generator.version = version
|
||||||
|
generator.modifications += [version_to_str(version)]
|
||||||
|
return generator
|
157
cscan/scanner.py
Normal file
157
cscan/scanner.py
Normal file
@ -0,0 +1,157 @@
|
|||||||
|
# Copyright (c) 2016 Hubert Kario
|
||||||
|
# Released under the Mozilla Public License 2.0
|
||||||
|
|
||||||
|
"""Classes used for scanning servers and getting their responses."""
|
||||||
|
|
||||||
|
import socket
|
||||||
|
|
||||||
|
from .constants import CipherSuite, HandshakeType
|
||||||
|
from .messages import Certificate, ServerHello, Message, NewSessionTicket, \
|
||||||
|
CertificateStatus, ServerKeyExchange
|
||||||
|
from tlslite.constants import CertificateType, ContentType
|
||||||
|
from tlslite.messages import \
|
||||||
|
CertificateRequest, NextProtocol, ServerHelloDone, Alert
|
||||||
|
from tlslite.defragmenter import Defragmenter
|
||||||
|
from tlslite.messagesocket import MessageSocket
|
||||||
|
from tlslite.errors import TLSAbruptCloseError, TLSIllegalParameterException
|
||||||
|
|
||||||
|
|
||||||
|
class HandshakeParser(object):
|
||||||
|
"""Inteligent parser for handshake messages."""
|
||||||
|
|
||||||
|
def __init__(self, version=(3, 1),
|
||||||
|
cipher_suite=CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
|
||||||
|
certificate_type=CertificateType.x509):
|
||||||
|
"""Initialize parser object."""
|
||||||
|
self.version = version
|
||||||
|
self.cipher_suite = cipher_suite
|
||||||
|
self.certificate_type = certificate_type
|
||||||
|
|
||||||
|
def parse(self, parser):
|
||||||
|
"""Parse a handshake message."""
|
||||||
|
hs_type = parser.get(1)
|
||||||
|
if hs_type == HandshakeType.server_hello:
|
||||||
|
msg = ServerHello().parse(parser)
|
||||||
|
self.version = msg.server_version
|
||||||
|
self.cipher_suite = msg.cipher_suite
|
||||||
|
self.certificate_type = msg.certificate_type
|
||||||
|
return msg
|
||||||
|
elif hs_type == HandshakeType.certificate:
|
||||||
|
msg = Certificate(self.certificate_type)
|
||||||
|
elif hs_type == HandshakeType.server_key_exchange:
|
||||||
|
msg = ServerKeyExchange(self.cipher_suite, self.version)
|
||||||
|
elif hs_type == HandshakeType.certificate_request:
|
||||||
|
msg = CertificateRequest(self.version)
|
||||||
|
elif hs_type == HandshakeType.next_protocol:
|
||||||
|
msg = NextProtocol().parse(parser)
|
||||||
|
elif hs_type == HandshakeType.server_hello_done:
|
||||||
|
msg = ServerHelloDone()
|
||||||
|
elif hs_type == HandshakeType.session_ticket:
|
||||||
|
msg = NewSessionTicket()
|
||||||
|
elif hs_type == HandshakeType.certificate_status:
|
||||||
|
msg = CertificateStatus()
|
||||||
|
else:
|
||||||
|
raise ValueError("Unknown handshake type: {0}".format(hs_type))
|
||||||
|
|
||||||
|
# don't abort when we can't parse a message, save it as unparsed
|
||||||
|
try:
|
||||||
|
msg.parse(parser)
|
||||||
|
except SyntaxError:
|
||||||
|
msg = Message(ContentType.handshake, parser.bytes)
|
||||||
|
return msg
|
||||||
|
|
||||||
|
|
||||||
|
class Scanner(object):
|
||||||
|
"""Helper class for scanning a host and returning serialised responses."""
|
||||||
|
|
||||||
|
def __init__(self, hello_gen, host, port=443, hostname=None):
|
||||||
|
"""Initialize scanner."""
|
||||||
|
self.host = host
|
||||||
|
self.hello_gen = hello_gen
|
||||||
|
self.port = port
|
||||||
|
self.hostname = hostname
|
||||||
|
|
||||||
|
def scan(self):
|
||||||
|
"""Perform a scan on server."""
|
||||||
|
defragger = Defragmenter()
|
||||||
|
defragger.addStaticSize(ContentType.change_cipher_spec, 1)
|
||||||
|
defragger.addStaticSize(ContentType.alert, 2)
|
||||||
|
defragger.addDynamicSize(ContentType.handshake, 1, 3)
|
||||||
|
|
||||||
|
try:
|
||||||
|
raw_sock = socket.create_connection((self.host, self.port), 5)
|
||||||
|
except socket.error as e:
|
||||||
|
return [e]
|
||||||
|
|
||||||
|
sock = MessageSocket(raw_sock, defragger)
|
||||||
|
|
||||||
|
if self.hostname is not None:
|
||||||
|
client_hello = self.hello_gen(bytearray(self.hostname,
|
||||||
|
'utf-8'))
|
||||||
|
else:
|
||||||
|
client_hello = self.hello_gen(None)
|
||||||
|
|
||||||
|
# record layer version - TLSv1.x
|
||||||
|
# use the version from configuration, if present, or default to the
|
||||||
|
# RFC recommended (3, 1) for TLS and (3, 0) for SSLv3
|
||||||
|
if hasattr(client_hello, 'record_version'):
|
||||||
|
sock.version = client_hello.record_version
|
||||||
|
elif hasattr(self.hello_gen, 'record_version'):
|
||||||
|
sock.version = self.hello_gen.record_version
|
||||||
|
elif client_hello.client_version > (3, 1): # TLS1.0
|
||||||
|
sock.version = (3, 1)
|
||||||
|
else:
|
||||||
|
sock.version = client_hello.client_version
|
||||||
|
|
||||||
|
# we don't want to send invalid messages (SSLv2 hello in SSL record
|
||||||
|
# layer), so set the record layer version to SSLv2 if the hello is
|
||||||
|
# of SSLv2 format
|
||||||
|
if client_hello.ssl2:
|
||||||
|
sock.version = (0, 2)
|
||||||
|
|
||||||
|
# save the record version used in the end for later analysis
|
||||||
|
client_hello.record_version = sock.version
|
||||||
|
|
||||||
|
messages = [client_hello]
|
||||||
|
|
||||||
|
handshake_parser = HandshakeParser()
|
||||||
|
|
||||||
|
try:
|
||||||
|
sock.sendMessageBlocking(client_hello)
|
||||||
|
except socket.error as e:
|
||||||
|
messages.append(e)
|
||||||
|
return messages
|
||||||
|
except TLSAbruptCloseError as e:
|
||||||
|
sock.sock.close()
|
||||||
|
messages.append(e)
|
||||||
|
return messages
|
||||||
|
|
||||||
|
# get all the server messages that affect connection, abort as soon
|
||||||
|
# as they've been read
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
header, parser = sock.recvMessageBlocking()
|
||||||
|
|
||||||
|
if header.type == ContentType.alert:
|
||||||
|
alert = Alert()
|
||||||
|
alert.parse(parser)
|
||||||
|
alert.record_version = header.version
|
||||||
|
messages += [alert]
|
||||||
|
elif header.type == ContentType.handshake:
|
||||||
|
msg = handshake_parser.parse(parser)
|
||||||
|
msg.record_version = header.version
|
||||||
|
messages += [msg]
|
||||||
|
if isinstance(msg, ServerHelloDone):
|
||||||
|
return messages
|
||||||
|
else:
|
||||||
|
raise TypeError("Unknown content type: {0}"
|
||||||
|
.format(header.type))
|
||||||
|
except (TLSAbruptCloseError, TLSIllegalParameterException,
|
||||||
|
ValueError, TypeError, socket.error, SyntaxError) as e:
|
||||||
|
messages += [e]
|
||||||
|
return messages
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
sock.sock.close()
|
||||||
|
except (socket.error, OSError):
|
||||||
|
pass
|
0
cscan_tests/__init__.py
Normal file
0
cscan_tests/__init__.py
Normal file
50
cscan_tests/test_config.py
Normal file
50
cscan_tests/test_config.py
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
# Copyright (c) 2015 Hubert Kario
|
||||||
|
# Released under Mozilla Public License Version 2.0
|
||||||
|
|
||||||
|
try:
|
||||||
|
import unittest2 as unittest
|
||||||
|
except ImportError:
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from tlslite.messages import ClientHello
|
||||||
|
from tlslite.extensions import SNIExtension, SupportedGroupsExtension, \
|
||||||
|
ECPointFormatsExtension, NPNExtension, SignatureAlgorithmsExtension
|
||||||
|
from tlslite.utils.codec import Parser
|
||||||
|
from cscan.config import Firefox_42
|
||||||
|
from cscan.extensions import RenegotiationExtension
|
||||||
|
from cscan.constants import ExtensionType
|
||||||
|
|
||||||
|
class TestFirefox(unittest.TestCase):
|
||||||
|
def test_firefox_42(self):
|
||||||
|
gen = Firefox_42()
|
||||||
|
ch = gen(bytearray(b'example.com'))
|
||||||
|
|
||||||
|
self.assertIsNotNone(ch)
|
||||||
|
self.assertIsInstance(ch, ClientHello)
|
||||||
|
self.assertEqual(len(ch.write()), 176)
|
||||||
|
self.assertEqual(ch.client_version, (3, 3))
|
||||||
|
self.assertEqual(gen.record_version, (3, 1))
|
||||||
|
self.assertEqual(len(ch.cipher_suites), 11)
|
||||||
|
self.assertIsInstance(ch.extensions[0], SNIExtension)
|
||||||
|
self.assertEqual(ch.extensions[1].extType,
|
||||||
|
ExtensionType.renegotiation_info)
|
||||||
|
self.assertIsInstance(ch.extensions[2],
|
||||||
|
SupportedGroupsExtension)
|
||||||
|
self.assertIsInstance(ch.extensions[3],
|
||||||
|
ECPointFormatsExtension)
|
||||||
|
self.assertEqual(ch.extensions[4].extType,
|
||||||
|
ExtensionType.session_ticket)
|
||||||
|
# bug in tlslite-ng, removes NPN extensions from provided extensions
|
||||||
|
#self.assertIsInstance(ch.extensions[5],
|
||||||
|
# NPNExtension)
|
||||||
|
self.assertEqual(ch.extensions[5].extType,
|
||||||
|
ExtensionType.alpn)
|
||||||
|
self.assertEqual(ch.extensions[6].extType,
|
||||||
|
ExtensionType.status_request)
|
||||||
|
self.assertIsInstance(ch.extensions[7],
|
||||||
|
SignatureAlgorithmsExtension)
|
||||||
|
self.assertEqual(ch.compression_methods, [0])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
76
cscan_tests/test_extensions.py
Normal file
76
cscan_tests/test_extensions.py
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
# Copyright (c) 2015 Hubert Kario
|
||||||
|
# Released under Mozilla Public License Version 2.0
|
||||||
|
|
||||||
|
try:
|
||||||
|
import unittest2 as unittest
|
||||||
|
except ImportError:
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from tlslite.utils.codec import Parser
|
||||||
|
from cscan.extensions import KeyShareExtension
|
||||||
|
from cscan.constants import GroupName
|
||||||
|
|
||||||
|
class TestKeyShareExtension(unittest.TestCase):
|
||||||
|
def test___init__(self):
|
||||||
|
ext = KeyShareExtension()
|
||||||
|
|
||||||
|
self.assertIsNotNone(ext)
|
||||||
|
|
||||||
|
def test_create(self):
|
||||||
|
ext = KeyShareExtension()
|
||||||
|
|
||||||
|
ext.create([(1, bytearray(b'\x12')),
|
||||||
|
(2, bytearray(b'\x33'))])
|
||||||
|
|
||||||
|
self.assertEqual(ext.client_shares, [(1, bytearray(b'\x12')),
|
||||||
|
(2, bytearray(b'\x33'))])
|
||||||
|
|
||||||
|
def test_write(self):
|
||||||
|
ext = KeyShareExtension()
|
||||||
|
|
||||||
|
ext.create([(GroupName.secp256r1, bytearray(b'\xff\xfa')),
|
||||||
|
(GroupName.ffdhe2048, bytearray(b'\xaf\xaa'))])
|
||||||
|
|
||||||
|
data = ext.write()
|
||||||
|
|
||||||
|
self.assertEqual(data, bytearray(
|
||||||
|
b'\x00\x2a\x00\x0d'
|
||||||
|
b'\x00\x0b'
|
||||||
|
b'\x00\x17\x02\xff\xfa'
|
||||||
|
b'\x01\x00\x00\x02\xaf\xaa'))
|
||||||
|
|
||||||
|
def test_write_with_no_data(self):
|
||||||
|
ext = KeyShareExtension()
|
||||||
|
|
||||||
|
data = ext.write()
|
||||||
|
|
||||||
|
self.assertEqual(data, bytearray(b'\x00\x2a\x00\x00'))
|
||||||
|
|
||||||
|
def test_parse(self):
|
||||||
|
parser = Parser(bytearray(
|
||||||
|
#b'\x00\x2a\x00\x0d'
|
||||||
|
b'\x00\x0b'
|
||||||
|
b'\x00\x17\x02\xff\xfa'
|
||||||
|
b'\x01\x00\x00\x02\xaf\xaa'))
|
||||||
|
|
||||||
|
ext = KeyShareExtension()
|
||||||
|
ext.parse(parser)
|
||||||
|
|
||||||
|
self.assertEqual(ext.client_shares,
|
||||||
|
[(GroupName.secp256r1, bytearray(b'\xff\xfa')),
|
||||||
|
(GroupName.ffdhe2048, bytearray(b'\xaf\xaa'))])
|
||||||
|
|
||||||
|
def test_parse_with_no_data(self):
|
||||||
|
parser = Parser(bytearray())
|
||||||
|
|
||||||
|
ext = KeyShareExtension()
|
||||||
|
ext.parse(parser)
|
||||||
|
|
||||||
|
self.assertIsNone(ext.client_shares)
|
||||||
|
|
||||||
|
def test___repr__(self):
|
||||||
|
ext = KeyShareExtension()
|
||||||
|
ext.create([(1, bytearray(b'\xff'))])
|
||||||
|
|
||||||
|
self.assertEqual("KeyShareExtension([(1, bytearray(b\'\\xff\'))])",
|
||||||
|
repr(ext))
|
@ -114,6 +114,8 @@ ecccurve = defaultdict(int)
|
|||||||
npn = defaultdict(int)
|
npn = defaultdict(int)
|
||||||
ocspstaple = defaultdict(int)
|
ocspstaple = defaultdict(int)
|
||||||
fallbacks = defaultdict(int)
|
fallbacks = defaultdict(int)
|
||||||
|
intolerancies = defaultdict(int)
|
||||||
|
impl_families = defaultdict(int)
|
||||||
# array with indexes of fallback names for the matrix report
|
# array with indexes of fallback names for the matrix report
|
||||||
fallback_ids = defaultdict(int)
|
fallback_ids = defaultdict(int)
|
||||||
i=0
|
i=0
|
||||||
@ -177,6 +179,8 @@ for r,d,flist in os.walk(path):
|
|||||||
tempecccurve = {}
|
tempecccurve = {}
|
||||||
tempnpn = {}
|
tempnpn = {}
|
||||||
tempfallbacks = {}
|
tempfallbacks = {}
|
||||||
|
tempintolerancies = {}
|
||||||
|
tempimpl_families = {}
|
||||||
""" supported ciphers by the server under scan """
|
""" supported ciphers by the server under scan """
|
||||||
tempcipherstats = {}
|
tempcipherstats = {}
|
||||||
temppfssigalgordering = {}
|
temppfssigalgordering = {}
|
||||||
@ -351,6 +355,30 @@ for r,d,flist in os.walk(path):
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
if 'intolerancies' in results:
|
||||||
|
intoler = results['intolerancies']
|
||||||
|
for name, val in intoler.items():
|
||||||
|
if val is True:
|
||||||
|
tempintolerancies[name] = 1
|
||||||
|
intol = [x.replace(' ', '_')
|
||||||
|
for x in tempintolerancies.keys()]
|
||||||
|
all_above_tls_1_2 = ('TLS_1.3', 'TLS_1.4', 'SSL_3.254',
|
||||||
|
'SSL_4.0', 'SSL_4.3', 'SSL_255.255')
|
||||||
|
if all(i in intol for i in all_above_tls_1_2):
|
||||||
|
for i in all_above_tls_1_2:
|
||||||
|
intol.remove(i)
|
||||||
|
intol.append('TLS_1.3+')
|
||||||
|
all_above_ssl_4_0 = ('SSL_4.3', 'SSL_4.0', 'SSL_255.255')
|
||||||
|
if all(i in intol for i in all_above_ssl_4_0):
|
||||||
|
for i in all_above_ssl_4_0:
|
||||||
|
intol.remove(i)
|
||||||
|
intol.append("SSL_4.0+")
|
||||||
|
if intol:
|
||||||
|
intol.sort(reverse=True)
|
||||||
|
tempimpl_families[" ".join(intol)] = 1
|
||||||
|
else:
|
||||||
|
tempintolerancies['x:missing information'] = 1
|
||||||
|
|
||||||
""" get some extra data about server """
|
""" get some extra data about server """
|
||||||
if 'renegotiation' in results:
|
if 'renegotiation' in results:
|
||||||
temprenegotiation[results['renegotiation']] = 1
|
temprenegotiation[results['renegotiation']] = 1
|
||||||
@ -582,6 +610,12 @@ for r,d,flist in os.walk(path):
|
|||||||
for s in tempfallbacks:
|
for s in tempfallbacks:
|
||||||
fallbacks[s] += 1
|
fallbacks[s] += 1
|
||||||
|
|
||||||
|
for s in tempintolerancies:
|
||||||
|
intolerancies[s] += 1
|
||||||
|
|
||||||
|
for s in tempimpl_families:
|
||||||
|
impl_families[s] += 1
|
||||||
|
|
||||||
for s in tempsigstats:
|
for s in tempsigstats:
|
||||||
sigalg[s] += 1
|
sigalg[s] += 1
|
||||||
|
|
||||||
@ -920,3 +954,15 @@ print("------------------------")
|
|||||||
fallback_ids_sorted=sorted(fallback_ids.items(), key=operator.itemgetter(1))
|
fallback_ids_sorted=sorted(fallback_ids.items(), key=operator.itemgetter(1))
|
||||||
for touple in fallback_ids_sorted:
|
for touple in fallback_ids_sorted:
|
||||||
print(str(touple[1]+1).rjust(3) + " " + str(touple[0]))
|
print(str(touple[1]+1).rjust(3) + " " + str(touple[0]))
|
||||||
|
|
||||||
|
print("\nClient Hello intolerance Count Percent")
|
||||||
|
print("----------------------------------------+---------+-------")
|
||||||
|
for stat in natural_sort(intolerancies):
|
||||||
|
percent = round(intolerancies[stat] / total * 100, 4)
|
||||||
|
sys.stdout.write(stat.ljust(40) + " " + str(intolerancies[stat]).ljust(10) + str(percent).ljust(4) + "\n")
|
||||||
|
|
||||||
|
print("\nImplementation families Count Percent")
|
||||||
|
print("-----------------------------------------------------+-----------+-------")
|
||||||
|
for stat in natural_sort(impl_families):
|
||||||
|
percent = round(impl_families[stat] / total * 100, 4)
|
||||||
|
sys.stdout.write(stat.ljust(50) + " " + str(impl_families[stat]).ljust(10) + str(percent).ljust(4) + "\n")
|
||||||
|
Loading…
Reference in New Issue
Block a user