From 68577f791830b44cd4fb2d446e7aa7a8a1ba120e Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Fri, 25 Jul 2014 17:49:44 +0200 Subject: [PATCH] collect statistics about found certificates --- top1m/parse_CAs.py | 346 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 346 insertions(+) create mode 100644 top1m/parse_CAs.py diff --git a/top1m/parse_CAs.py b/top1m/parse_CAs.py new file mode 100644 index 0000000..672548a --- /dev/null +++ b/top1m/parse_CAs.py @@ -0,0 +1,346 @@ +#!/usr/bin/env python + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# Author: Hubert Kario - 2014 + +from __future__ import division + +path = "./results/" +ca_certs_path = "./ca_files" +certs_path = "./certs" + +""" only root CAs, no cached intermediate certs """ +trust_path = "./ca_trusted" + +import json +import sys +from collections import defaultdict +import os +import re +import subprocess +from OpenSSL import crypto +from M2Crypto import X509, EVP +from m2ext import _m2ext +from m2ext import SSL +import glob + +from pprint import pprint + +# override m2ext implementation so that it is possible to provide additional +# certificates to be used during verification. Requires m2ext with a small fix +# from https://github.com/tomato42/m2ext/tree/extended_ctx_init +class Context(SSL.Context): + def validate_certificate(self, cert, chain=None): + """ + Validate a certificate using this SSL Context + """ + if chain: + ptr = chain._ptr() + else: + ptr = None + store_ctx = X509.X509_Store_Context(_m2ext.x509_store_ctx_new(), _pyfree=1) + _m2ext.x509_store_ctx_init(store_ctx.ctx, + self.get_cert_store().store, + cert.x509, ptr) + rc = _m2ext.x509_verify_cert(store_ctx.ctx) + if rc < 0: + raise SSL.SSLError("Empty context") + return rc != 0 + + +invocations = defaultdict(int) + +total = 0 +hosts = 0 +chains = defaultdict(int) +chain_len = defaultdict(int) +keysize = defaultdict(int) +root_CA = defaultdict(int) +sig_alg = defaultdict(int) +intermediate_CA = defaultdict(int) + +subject_hashes = {} +issuer_hashes = {} + +def get_cert_subject_name(cert): + subject = cert.get_subject() + commonName = None + organization = None + + for elem,val in subject.get_components(): + if elem == "CN" and commonName is None: + commonName = val + if elem == "O" and organization is None: + organization = val + + s_hash = "(" + ("%0.8X" % subject.hash()).lower() + ") " + + if commonName is not None: + return s_hash + commonName + elif organization is not None: + return s_hash + organization + else: + return s_hash + +def get_cert_hashes(path): + if path in subject_hashes: + return subject_hashes[path], issuer_hashes[path] + + with open(path) as srv_c_f: + srv_c_pem = srv_c_f.read() + + srv_c = crypto.load_certificate(crypto.FILETYPE_PEM, srv_c_pem) + + # can't make M2Crypto to output OpenSSL-compatible hashes... + subject_hash = ("%0.8X" % srv_c.get_subject().hash()).lower() + issuer_hash = ("%0.8X" % srv_c.get_issuer().hash()).lower() + + subject_hashes[path] = subject_hash + issuer_hashes[path] = issuer_hash + + return subject_hash, issuer_hash + +def gen_cert_paths(paths): + + # failsafe in case of a loop in path resolution + if len(paths) > 10: + return + + subject_hash, issuer_hash = get_cert_hashes(paths[-1]) + + if subject_hash == issuer_hash: + yield paths + else: + for ca_file in glob.glob(ca_certs_path + '/' + issuer_hash + ".*"): + for perm in gen_cert_paths(paths + [ca_file]): + if not perm in paths: + yield perm + +def is_chain_complete_f(file_names): + + stack = X509.X509_Stack() + for f_name in file_names[1:]: + cert = X509.load_cert(f_name) + stack.push(cert) + + cert = X509.load_cert(file_names[0]) + + return trusted_context.validate_certificate(cert, stack) + +def is_chain_complete(certs): + + stack = X509.X509_Stack() + + for cert in certs[1:]: + stack.push(cert) + + return trusted_context.validate_certificate(certs[0], stack) + +def is_chain_trusted(cert_hashes): + + c_hash = cert_hashes[0] + """ first check the likely option: the cert dir """ + file_name = certs_path + '/' + c_hash + '.pem' + if not os.path.exists(file_name): + """ then try the unlikely option: ca directory """ + file_name = ca_certs_path + '/' + c_hash + '.pem' + if not os.path.exists(file_name): + print "File with hash " + c_hash + " is missing!" + return False,None + + for cert_paths in gen_cert_paths([ file_name ]): + if is_chain_complete_f(cert_paths): + return True,cert_paths + + return False,None + +def get_path_for_hash(cert_hash): + f_name = certs_path + '/' + c_hash + '.pem' + if not os.path.exists(f_name): + f_name = ca_certs_path + '/' + c_hash + '.pem' + if not os.path.exists(f_name): + print "File with hash " + c_hash + " is missing!" + return None + return f_name + +def is_chain_trusted_at_all(cert_list): + certs = [] + stack = X509.X509_Stack() + + cert = cert_list[0] + + for ca in cert_list[1:]: + stack.push(ca) + + return all_CAs_context.validate_certificate(cert, stack) + +def collect_key_sizes(file_names): + + """ don't collect signature alg for the self signed root """ + with open(file_names[-1]) as cert_file: + cert_pem = cert_file.read() + + cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + + pubkey = cert.get_pubkey() + if pubkey.type() == crypto.TYPE_RSA: + keysize['RSA ' + str(pubkey.bits())] += 1 + elif pubkey.type() == crypto.TYPE_DSA: + keysize['DSA ' + str(pubkey.bits())] += 1 + elif pubkey.type() == 408: + keysize['ECDSA ' + str(pubkey.bits())] += 1 + else: + keysize[str(pubkey.type()) + ' ' + str(pubkey.bits())] += 1 + + root_CA[get_cert_subject_name(cert)] += 1 + + """ exclude the self signed root and server cert from stats """ + for f_name in file_names[1:-1]: + with open(f_name) as cert_file: + cert_pem = cert_file.read() + + cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + + pubkey = cert.get_pubkey() + if pubkey.type() == crypto.TYPE_RSA: + keysize['RSA ' + str(pubkey.bits())] += 1 + elif pubkey.type() == crypto.TYPE_DSA: + keysize['DSA ' + str(pubkey.bits())] += 1 + elif pubkey.type() == 408: + keysize['ECDSA ' + str(pubkey.bits())] += 1 + else: + keysize[str(pubkey.type()) + ' ' + str(pubkey.bits())] += 1 + + sig_alg[cert.get_signature_algorithm()] += 1 + + intermediate_CA[get_cert_subject_name(cert)] += 1 + +all_CAs_context = Context() +all_CAs_context.load_verify_locations(capath=ca_certs_path) +trusted_context = Context() +trusted_context.load_verify_locations(capath=trust_path) + +for r,d,flist in os.walk(path): + for f in flist: + + server_chain_trusted = False + server_chain_complete = False + server_chains = [] + chains_tested = [] + valid = True + + """ process the file """ + f_abs = os.path.join(r,f) + with open(f_abs) as json_file: + """ Keep certificates in memory for a given file """ + known_certs = {} + + """ discard files that fail to load """ + try: + results = json.load(json_file) + except ValueError: + continue + + """ discard files with empty results """ + if len(results['ciphersuite']) < 1: + continue + + valid = True + + """ loop over list of ciphers """ + for entry in results['ciphersuite']: + + """ skip entries which don't have certificate references """ + if not 'certificates' in entry: + continue + + """ skip entries for A(EC)DH suites """ + if len(entry['certificates']) < 1: + continue + + if not entry['certificates'] in chains_tested: + certs = [] + + for c_hash in entry['certificates']: + if c_hash in known_certs: + certs += [known_certs[c_hash]] + else: + cert = X509.load_cert(get_path_for_hash(c_hash)) + known_certs[c_hash] = cert + certs += [cert] + + if is_chain_trusted_at_all(certs): + ret,tmpchain = is_chain_trusted(entry['certificates']) + if ret: + server_chain_trusted = True + if not tmpchain in server_chains: + server_chains += [tmpchain] + if is_chain_complete(certs): + server_chain_complete = True + + chains_tested += [entry['certificates']] + + if server_chain_trusted: + if server_chain_complete: + chains["complete"] += 1 + else: + chains["incomplete"] += 1 + else: + chains["untrusted"] += 1 + + if valid: + hosts += 1 + + for chain in server_chains: + collect_key_sizes(chain) + chain_len[str(len(chain))] += 1 + if len(chain) == 1: + print "file with chain 1 long " + f_abs + total += 1 + +""" Display stats """ +#print "openssl invocations: " + str(invocations["openssl"]) + +print "Statistics from " + str(total) + " chains provided by " + str(hosts) + " hosts" + +print("\nServer provided chains Count Percent") +print("-------------------------+---------+-------") +for stat in sorted(chains): + percent = round(chains[stat] / hosts * 100, 4) + sys.stdout.write(stat.ljust(25) + " " + str(chains[stat]).ljust(10) + str(percent).ljust(4) + "\n") + +print("\nTrusted chain statistics") +print("========================") + + +print("\nChain length Count Percent") +print("-------------------------+---------+-------") +for stat in sorted(chain_len): + percent = round(chain_len[stat] / total * 100, 4) + sys.stdout.write(stat.ljust(25) + " " + str(chain_len[stat]).ljust(10) + str(percent).ljust(4) + "\n") + +print("\nCA Key Size Count Percent") +print("-------------------------+---------+-------") +for stat in sorted(keysize): + percent = round(keysize[stat] / total * 100, 4) + sys.stdout.write(stat.ljust(25) + " " + str(keysize[stat]).ljust(10) + str(percent).ljust(4) + "\n") + +print("\nRoot CAs Count Percent") +print("---------------------------------------------+---------+-------") +for stat in sorted(root_CA): + percent = round(root_CA[stat] / total * 100, 4) + sys.stdout.write(stat.ljust(45)[0:45] + " " + str(root_CA[stat]).ljust(10) + str(percent).ljust(4) + "\n") + +print("\nSignature algorithm Count Percent") +print("-------------------------+---------+-------") +for stat in sorted(sig_alg): + percent = round(sig_alg[stat] / total * 100, 4) + sys.stdout.write(stat.ljust(25) + " " + str(sig_alg[stat]).ljust(10) + str(percent).ljust(4) + "\n") + +print("\nIntermediate CA Count Percent") +print("---------------------------------------------+---------+-------") +for stat in sorted(intermediate_CA): + percent = round(intermediate_CA[stat] / total * 100, 4) + sys.stdout.write(stat.ljust(45)[0:45] + " " + str(intermediate_CA[stat]).ljust(10) + str(percent).ljust(4) + "\n")