mirror of
https://github.com/mozilla/cipherscan.git
synced 2024-10-06 02:53:42 +02:00
450 lines
14 KiB
Python
450 lines
14 KiB
Python
#!/usr/bin/env python
|
|
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
# Author: Hubert Kario - 2014
|
|
|
|
from __future__ import division
|
|
|
|
path = "./results/"
|
|
ca_certs_path = "./ca_files"
|
|
certs_path = "./certs"
|
|
|
|
""" only root CAs, no cached intermediate certs """
|
|
trust_path = "./ca_trusted"
|
|
|
|
import json
|
|
import sys
|
|
from collections import defaultdict
|
|
import os
|
|
import re
|
|
import subprocess
|
|
from OpenSSL import crypto
|
|
from M2Crypto import X509, EVP
|
|
from m2ext import _m2ext
|
|
from m2ext import SSL
|
|
import glob
|
|
|
|
from pprint import pprint
|
|
|
|
# override m2ext implementation so that it is possible to provide additional
|
|
# certificates to be used during verification. Requires m2ext with a small fix
|
|
# from https://github.com/tomato42/m2ext/tree/extended_ctx_init
|
|
class Context(SSL.Context):
|
|
def validate_certificate(self, cert, chain=None):
|
|
"""
|
|
Validate a certificate using this SSL Context
|
|
"""
|
|
if chain:
|
|
ptr = chain._ptr()
|
|
else:
|
|
ptr = None
|
|
store_ctx = X509.X509_Store_Context(_m2ext.x509_store_ctx_new(), _pyfree=1)
|
|
_m2ext.x509_store_ctx_init(store_ctx.ctx,
|
|
self.get_cert_store().store,
|
|
cert.x509, ptr)
|
|
rc = _m2ext.x509_verify_cert(store_ctx.ctx)
|
|
if rc < 0:
|
|
raise SSL.SSLError("Empty context")
|
|
return rc != 0
|
|
|
|
|
|
invocations = defaultdict(int)
|
|
|
|
total = 0
|
|
hosts = 0
|
|
chains = defaultdict(int)
|
|
chain_len = defaultdict(int)
|
|
keysize = defaultdict(int)
|
|
keysize_per_chain = defaultdict(int)
|
|
root_CA = defaultdict(int)
|
|
sig_alg = defaultdict(int)
|
|
intermediate_CA = defaultdict(int)
|
|
effective_security = defaultdict(int)
|
|
|
|
subject_hashes = {}
|
|
issuer_hashes = {}
|
|
|
|
def get_cert_subject_name(cert):
|
|
subject = cert.get_subject()
|
|
commonName = None
|
|
organization = None
|
|
|
|
for elem,val in subject.get_components():
|
|
if elem == "CN" and commonName is None:
|
|
commonName = val
|
|
if elem == "O" and organization is None:
|
|
organization = val
|
|
|
|
s_hash = "(" + ("%0.8X" % subject.hash()).lower() + ") "
|
|
|
|
if commonName is not None:
|
|
return s_hash + commonName
|
|
elif organization is not None:
|
|
return s_hash + organization
|
|
else:
|
|
return s_hash
|
|
|
|
def get_cert_hashes(path):
|
|
if path in subject_hashes:
|
|
return subject_hashes[path], issuer_hashes[path]
|
|
|
|
with open(path) as srv_c_f:
|
|
srv_c_pem = srv_c_f.read()
|
|
|
|
srv_c = crypto.load_certificate(crypto.FILETYPE_PEM, srv_c_pem)
|
|
|
|
# can't make M2Crypto to output OpenSSL-compatible hashes...
|
|
subject_hash = ("%0.8X" % srv_c.get_subject().hash()).lower()
|
|
issuer_hash = ("%0.8X" % srv_c.get_issuer().hash()).lower()
|
|
|
|
subject_hashes[path] = subject_hash
|
|
issuer_hashes[path] = issuer_hash
|
|
|
|
return subject_hash, issuer_hash
|
|
|
|
def gen_cert_paths(paths):
|
|
|
|
# failsafe in case of a loop in path resolution
|
|
if len(paths) > 10:
|
|
return
|
|
|
|
subject_hash, issuer_hash = get_cert_hashes(paths[-1])
|
|
|
|
if subject_hash == issuer_hash:
|
|
yield paths
|
|
else:
|
|
for ca_file in glob.glob(ca_certs_path + '/' + issuer_hash + ".*"):
|
|
for perm in gen_cert_paths(paths + [ca_file]):
|
|
if not perm in paths:
|
|
yield perm
|
|
|
|
def is_chain_complete_f(file_names):
|
|
|
|
stack = X509.X509_Stack()
|
|
for f_name in file_names[1:]:
|
|
cert = X509.load_cert(f_name)
|
|
stack.push(cert)
|
|
|
|
cert = X509.load_cert(file_names[0])
|
|
|
|
return trusted_context.validate_certificate(cert, stack)
|
|
|
|
def is_chain_complete(certs):
|
|
|
|
stack = X509.X509_Stack()
|
|
|
|
for cert in certs[1:]:
|
|
stack.push(cert)
|
|
|
|
return trusted_context.validate_certificate(certs[0], stack)
|
|
|
|
def is_chain_trusted(cert_hashes):
|
|
|
|
c_hash = cert_hashes[0]
|
|
""" first check the likely option: the cert dir """
|
|
file_name = certs_path + '/' + c_hash + '.pem'
|
|
if not os.path.exists(file_name):
|
|
""" then try the unlikely option: ca directory """
|
|
file_name = ca_certs_path + '/' + c_hash + '.pem'
|
|
if not os.path.exists(file_name):
|
|
print "File with hash " + c_hash + " is missing!"
|
|
return False,None
|
|
|
|
for cert_paths in gen_cert_paths([ file_name ]):
|
|
if is_chain_complete_f(cert_paths):
|
|
return True,cert_paths
|
|
|
|
return False,None
|
|
|
|
def get_path_for_hash(cert_hash):
|
|
f_name = certs_path + '/' + c_hash + '.pem'
|
|
if not os.path.exists(f_name):
|
|
f_name = ca_certs_path + '/' + c_hash + '.pem'
|
|
if not os.path.exists(f_name):
|
|
#print "File with hash " + c_hash + " is missing!"
|
|
return None
|
|
return f_name
|
|
|
|
def is_chain_trusted_at_all(cert_list):
|
|
certs = []
|
|
stack = X509.X509_Stack()
|
|
|
|
cert = cert_list[0]
|
|
|
|
for ca in cert_list[1:]:
|
|
stack.push(ca)
|
|
|
|
return all_CAs_context.validate_certificate(cert, stack)
|
|
|
|
""" convert RSA and DSA key sizes to estimated Level of security """
|
|
def rsa_key_size_to_los(size):
|
|
if size < 760:
|
|
return 40
|
|
elif size < 1020:
|
|
return 64
|
|
elif size < 2040:
|
|
return 80
|
|
elif size < 3068:
|
|
return 112
|
|
elif size < 4094:
|
|
return 128
|
|
elif size < 7660:
|
|
return 152
|
|
elif size < 15300:
|
|
return 192
|
|
else:
|
|
return 256
|
|
|
|
def sig_alg_to_los(name):
|
|
if 'SHA1' in name.upper():
|
|
return 80
|
|
elif 'SHA224' in name.upper():
|
|
return 112
|
|
elif 'SHA256' in name.upper():
|
|
return 128
|
|
elif 'SHA384' in name.upper():
|
|
return 192
|
|
elif 'SHA512' in name.upper():
|
|
return 256
|
|
else:
|
|
raise UnknownSigAlgError
|
|
|
|
def collect_key_sizes(file_names):
|
|
|
|
tmp_keysize = {}
|
|
|
|
""" don't collect signature alg for the self signed root """
|
|
with open(file_names[-1]) as cert_file:
|
|
cert_pem = cert_file.read()
|
|
|
|
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
|
|
|
|
pubkey = cert.get_pubkey()
|
|
if pubkey.type() == crypto.TYPE_RSA:
|
|
keysize['RSA ' + str(pubkey.bits())] += 1
|
|
tmp_keysize['RSA ' + str(pubkey.bits())] = 1
|
|
security_level = rsa_key_size_to_los(pubkey.bits())
|
|
elif pubkey.type() == crypto.TYPE_DSA:
|
|
keysize['DSA ' + str(pubkey.bits())] += 1
|
|
tmp_keysize['DSA ' + str(pubkey.bits())] = 1
|
|
security_level = rsa_key_size_to_los(pubkey.bits())
|
|
elif pubkey.type() == 408:
|
|
keysize['ECDSA ' + str(pubkey.bits())] += 1
|
|
tmp_keysize['ECDSA ' + str(pubkey.bits())] = 1
|
|
security_level = pubkey.bits()/2
|
|
else:
|
|
keysize[str(pubkey.type()) + ' ' + str(pubkey.bits())] += 1
|
|
security_level = 0
|
|
|
|
root_CA[get_cert_subject_name(cert)] += 1
|
|
|
|
""" exclude the self signed root and server cert from stats """
|
|
for f_name in file_names[1:-1]:
|
|
with open(f_name) as cert_file:
|
|
cert_pem = cert_file.read()
|
|
|
|
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
|
|
|
|
pubkey = cert.get_pubkey()
|
|
if pubkey.type() == crypto.TYPE_RSA:
|
|
keysize['RSA ' + str(pubkey.bits())] += 1
|
|
tmp_keysize['RSA ' + str(pubkey.bits())] = 1
|
|
c_key_level = rsa_key_size_to_los(pubkey.bits())
|
|
elif pubkey.type() == crypto.TYPE_DSA:
|
|
keysize['DSA ' + str(pubkey.bits())] += 1
|
|
tmp_keysize['DSA ' + str(pubkey.bits())] = 1
|
|
c_key_level = rsa_key_size_to_los(pubkey.bits())
|
|
elif pubkey.type() == 408:
|
|
keysize['ECDSA ' + str(pubkey.bits())] += 1
|
|
tmp_keysize['ECDSA ' + str(pubkey.bits())] = 1
|
|
c_key_level = pubkey.bits() / 2
|
|
else:
|
|
keysize[str(pubkey.type()) + ' ' + str(pubkey.bits())] += 1
|
|
c_key_level = 0
|
|
|
|
if security_level > c_key_level:
|
|
security_level = c_key_level
|
|
|
|
sig_alg[cert.get_signature_algorithm()] += 1
|
|
c_sig_level = sig_alg_to_los(cert.get_signature_algorithm())
|
|
if security_level > c_sig_level:
|
|
security_level = c_sig_level
|
|
|
|
intermediate_CA[get_cert_subject_name(cert)] += 1
|
|
|
|
for key_s in tmp_keysize:
|
|
keysize_per_chain[key_s] += 1
|
|
|
|
# XXX doesn't handle the situation in which the CA uses its certificate
|
|
# for a web server properly
|
|
with open(file_names[0]) as cert_file:
|
|
cert_pem = cert_file.read()
|
|
|
|
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
|
|
|
|
pubkey = cert.get_pubkey()
|
|
if pubkey.type() == crypto.TYPE_RSA:
|
|
c_key_level = rsa_key_size_to_los(pubkey.bits())
|
|
elif pubkey.type() == crypto.TYPE_DSA:
|
|
c_key_level = rsa_key_size_to_los(pubkey.bits())
|
|
elif pubkey.type() == 408:
|
|
c_key_level = pubkey.bits() / 2
|
|
else:
|
|
c_key_level = 0
|
|
|
|
if security_level > c_key_level:
|
|
security_level = c_key_level
|
|
|
|
c_sig_level = sig_alg_to_los(cert.get_signature_algorithm())
|
|
if security_level > c_sig_level:
|
|
security_level = c_sig_level
|
|
|
|
effective_security[security_level] += 1
|
|
|
|
|
|
all_CAs_context = Context()
|
|
all_CAs_context.load_verify_locations(capath=ca_certs_path)
|
|
trusted_context = Context()
|
|
trusted_context.load_verify_locations(capath=trust_path)
|
|
|
|
for r,d,flist in os.walk(path):
|
|
for f in flist:
|
|
|
|
server_chain_trusted = False
|
|
server_chain_complete = False
|
|
server_chains = []
|
|
chains_tested = []
|
|
valid = True
|
|
|
|
""" process the file """
|
|
f_abs = os.path.join(r,f)
|
|
with open(f_abs) as json_file:
|
|
""" Keep certificates in memory for a given file """
|
|
known_certs = {}
|
|
|
|
""" discard files that fail to load """
|
|
try:
|
|
results = json.load(json_file)
|
|
except ValueError:
|
|
continue
|
|
|
|
""" discard files with empty results """
|
|
if len(results['ciphersuite']) < 1:
|
|
continue
|
|
|
|
valid = True
|
|
|
|
""" loop over list of ciphers """
|
|
for entry in results['ciphersuite']:
|
|
|
|
""" skip entries which don't have certificate references """
|
|
if not 'certificates' in entry:
|
|
continue
|
|
|
|
""" skip entries for A(EC)DH suites """
|
|
if len(entry['certificates']) < 1:
|
|
continue
|
|
|
|
if not entry['certificates'] in chains_tested:
|
|
certs = []
|
|
|
|
for c_hash in entry['certificates']:
|
|
if c_hash in known_certs:
|
|
certs += [known_certs[c_hash]]
|
|
else:
|
|
path = get_path_for_hash(c_hash)
|
|
if path is None:
|
|
continue
|
|
cert = X509.load_cert(path)
|
|
known_certs[c_hash] = cert
|
|
certs += [cert]
|
|
|
|
if is_chain_trusted_at_all(certs):
|
|
ret,tmpchain = is_chain_trusted(entry['certificates'])
|
|
if ret:
|
|
server_chain_trusted = True
|
|
if not tmpchain in server_chains:
|
|
server_chains += [tmpchain]
|
|
if is_chain_complete(certs):
|
|
server_chain_complete = True
|
|
|
|
chains_tested += [entry['certificates']]
|
|
|
|
if server_chain_trusted:
|
|
if server_chain_complete:
|
|
chains["complete"] += 1
|
|
print "complete: " + f
|
|
else:
|
|
chains["incomplete"] += 1
|
|
print "incomplete: " + f
|
|
else:
|
|
chains["untrusted"] += 1
|
|
print "untrusted: " + f
|
|
|
|
if valid:
|
|
hosts += 1
|
|
|
|
for chain in server_chains:
|
|
collect_key_sizes(chain)
|
|
chain_len[str(len(chain))] += 1
|
|
if len(chain) == 1:
|
|
print "file with chain 1 long " + f_abs
|
|
total += 1
|
|
|
|
""" Display stats """
|
|
#print "openssl invocations: " + str(invocations["openssl"])
|
|
|
|
print "Statistics from " + str(total) + " chains provided by " + str(hosts) + " hosts"
|
|
|
|
print("\nServer provided chains Count Percent")
|
|
print("-------------------------+---------+-------")
|
|
for stat in sorted(chains):
|
|
percent = round(chains[stat] / hosts * 100, 4)
|
|
sys.stdout.write(stat.ljust(25) + " " + str(chains[stat]).ljust(10) + str(percent).ljust(4) + "\n")
|
|
|
|
print("\nTrusted chain statistics")
|
|
print("========================")
|
|
|
|
|
|
print("\nChain length Count Percent")
|
|
print("-------------------------+---------+-------")
|
|
for stat in sorted(chain_len):
|
|
percent = round(chain_len[stat] / total * 100, 4)
|
|
sys.stdout.write(stat.ljust(25) + " " + str(chain_len[stat]).ljust(10) + str(percent).ljust(4) + "\n")
|
|
|
|
print("\nCA key size in chains Count")
|
|
print("-------------------------+---------")
|
|
for stat in sorted(keysize):
|
|
sys.stdout.write(stat.ljust(25) + " " + str(keysize[stat]).ljust(10) + "\n")
|
|
|
|
print("\nChains with CA key Count Percent")
|
|
print("-------------------------+---------+-------")
|
|
for stat in sorted(keysize_per_chain):
|
|
percent = round(keysize_per_chain[stat] / total * 100, 4)
|
|
sys.stdout.write(stat.ljust(25) + " " + str(keysize_per_chain[stat]).ljust(10) + str(percent).ljust(4) + "\n")
|
|
|
|
print("\nSignature algorithm (ex. root) Count")
|
|
print("------------------------------+---------")
|
|
for stat in sorted(sig_alg):
|
|
sys.stdout.write(stat.ljust(30) + " " + str(sig_alg[stat]).ljust(10) + "\n")
|
|
|
|
print("\nEff. host cert chain LoS Count Percent")
|
|
print("-------------------------+---------+-------")
|
|
for stat in sorted(effective_security):
|
|
percent = round(effective_security[stat] / total * 100, 4)
|
|
sys.stdout.write(str(stat).ljust(25) + " " + str(effective_security[stat]).ljust(10) + str(percent).ljust(4) + "\n")
|
|
|
|
print("\nRoot CAs Count Percent")
|
|
print("---------------------------------------------+---------+-------")
|
|
for stat in sorted(root_CA):
|
|
percent = round(root_CA[stat] / total * 100, 4)
|
|
sys.stdout.write(stat.ljust(45)[0:45] + " " + str(root_CA[stat]).ljust(10) + str(percent).ljust(4) + "\n")
|
|
|
|
print("\nIntermediate CA Count Percent")
|
|
print("---------------------------------------------+---------+-------")
|
|
for stat in sorted(intermediate_CA):
|
|
percent = round(intermediate_CA[stat] / total * 100, 4)
|
|
sys.stdout.write(stat.ljust(45)[0:45] + " " + str(intermediate_CA[stat]).ljust(10) + str(percent).ljust(4) + "\n")
|