diff --git a/top1m/parse_CAs.py b/top1m/parse_CAs.py index 72c2b8f..3b31183 100644 --- a/top1m/parse_CAs.py +++ b/top1m/parse_CAs.py @@ -18,37 +18,7 @@ import json import sys from collections import defaultdict import os -import re -import subprocess from OpenSSL import crypto -from M2Crypto import X509, EVP -from m2ext import _m2ext -from m2ext import SSL -import glob - -from pprint import pprint - -# override m2ext implementation so that it is possible to provide additional -# certificates to be used during verification. Requires m2ext with a small fix -# from https://github.com/tomato42/m2ext/tree/extended_ctx_init -class Context(SSL.Context): - def validate_certificate(self, cert, chain=None): - """ - Validate a certificate using this SSL Context - """ - if chain: - ptr = chain._ptr() - else: - ptr = None - store_ctx = X509.X509_Store_Context(_m2ext.x509_store_ctx_new(), _pyfree=1) - _m2ext.x509_store_ctx_init(store_ctx.ctx, - self.get_cert_store().store, - cert.x509, ptr) - rc = _m2ext.x509_verify_cert(store_ctx.ctx) - if rc < 0: - raise SSL.SSLError("Empty context") - return rc != 0 - invocations = defaultdict(int) @@ -86,98 +56,15 @@ def get_cert_subject_name(cert): else: return s_hash -def get_cert_hashes(path): - if path in subject_hashes: - return subject_hashes[path], issuer_hashes[path] - - with open(path) as srv_c_f: - srv_c_pem = srv_c_f.read() - - srv_c = crypto.load_certificate(crypto.FILETYPE_PEM, srv_c_pem) - - # can't make M2Crypto to output OpenSSL-compatible hashes... - subject_hash = ("%0.8X" % srv_c.get_subject().hash()).lower() - issuer_hash = ("%0.8X" % srv_c.get_issuer().hash()).lower() - - subject_hashes[path] = subject_hash - issuer_hashes[path] = issuer_hash - - return subject_hash, issuer_hash - -def gen_cert_paths(paths): - - # failsafe in case of a loop in path resolution - if len(paths) > 10: - return - - subject_hash, issuer_hash = get_cert_hashes(paths[-1]) - - if subject_hash == issuer_hash: - yield paths - else: - for ca_file in glob.glob(ca_certs_path + '/' + issuer_hash + ".*"): - for perm in gen_cert_paths(paths + [ca_file]): - if not perm in paths: - yield perm - -def is_chain_complete_f(file_names): - - stack = X509.X509_Stack() - for f_name in file_names[1:]: - cert = X509.load_cert(f_name) - stack.push(cert) - - cert = X509.load_cert(file_names[0]) - - return trusted_context.validate_certificate(cert, stack) - -def is_chain_complete(certs): - - stack = X509.X509_Stack() - - for cert in certs[1:]: - stack.push(cert) - - return trusted_context.validate_certificate(certs[0], stack) - -def is_chain_trusted(cert_hashes): - - c_hash = cert_hashes[0] - """ first check the likely option: the cert dir """ - file_name = certs_path + '/' + c_hash + '.pem' - if not os.path.exists(file_name): - """ then try the unlikely option: ca directory """ - file_name = ca_certs_path + '/' + c_hash + '.pem' - if not os.path.exists(file_name): - print "File with hash " + c_hash + " is missing!" - return False,None - - for cert_paths in gen_cert_paths([ file_name ]): - if is_chain_complete_f(cert_paths): - return True,cert_paths - - return False,None - def get_path_for_hash(cert_hash): - f_name = certs_path + '/' + c_hash + '.pem' + f_name = certs_path + '/' + cert_hash + '.pem' if not os.path.exists(f_name): - f_name = ca_certs_path + '/' + c_hash + '.pem' + f_name = ca_certs_path + '/' + cert_hash + '.pem' if not os.path.exists(f_name): #print "File with hash " + c_hash + " is missing!" return None return f_name -def is_chain_trusted_at_all(cert_list): - certs = [] - stack = X509.X509_Stack() - - cert = cert_list[0] - - for ca in cert_list[1:]: - stack.push(ca) - - return all_CAs_context.validate_certificate(cert, stack) - """ convert RSA and DSA key sizes to estimated Level of security """ def rsa_key_size_to_los(size): if size < 760: @@ -304,94 +191,84 @@ def collect_key_sizes(file_names): effective_security[security_level] += 1 -all_CAs_context = Context() -all_CAs_context.load_verify_locations(capath=ca_certs_path) -trusted_context = Context() -trusted_context.load_verify_locations(capath=trust_path) +with open("parsed") as res_file: + for line in res_file: + try: + res = json.loads(line) + except ValueError as e: + print "can't process line: " + line + continue -for r,d,flist in os.walk(path): - for f in flist: + f=res - server_chain_trusted = False - server_chain_complete = False - server_chains = [] - chains_tested = [] - valid = True + try: + server_chain_trusted = False + server_chain_complete = False + server_chains = [] + valid = False - """ process the file """ - f_abs = os.path.join(r,f) - with open(f_abs) as json_file: """ Keep certificates in memory for a given file """ known_certs = {} - """ discard files that fail to load """ - try: - results = json.load(json_file) - except ValueError: + if not "chains" in f: continue - """ discard files with empty results """ - if len(results['ciphersuite']) < 1: - continue + results = f["chains"] - valid = True + """ discard hosts with empty results """ + if len(results) < 1: + continue """ loop over list of ciphers """ - for entry in results['ciphersuite']: + for entry in results: - """ skip entries which don't have certificate references """ - if not 'certificates' in entry: + """ skip invalid results """ + if not 'chain' in entry: continue - """ skip entries for A(EC)DH suites """ - if len(entry['certificates']) < 1: + valid = True + + if entry['chain'] == "untrusted": continue - if not entry['certificates'] in chains_tested: - certs = [] + if entry['chain'] == "complete": + server_chain_complete = True + server_chain_trusted = True - for c_hash in entry['certificates']: - if c_hash in known_certs: - certs += [known_certs[c_hash]] - else: - path = get_path_for_hash(c_hash) - if path is None: - continue - cert = X509.load_cert(path) - known_certs[c_hash] = cert - certs += [cert] + if entry['chain'] == "incomplete": + server_chain_trusted = True - if is_chain_trusted_at_all(certs): - ret,tmpchain = is_chain_trusted(entry['certificates']) - if ret: - server_chain_trusted = True - if not tmpchain in server_chains: - server_chains += [tmpchain] - if is_chain_complete(certs): - server_chain_complete = True + server_chains += [entry['certificates']] - chains_tested += [entry['certificates']] - - if server_chain_trusted: - if server_chain_complete: - chains["complete"] += 1 - print "complete: " + f + if server_chain_trusted: + if server_chain_complete: + chains["complete"] += 1 + print "complete: " + f['host'] + else: + chains["incomplete"] += 1 + print "incomplete: " + f['host'] else: - chains["incomplete"] += 1 - print "incomplete: " + f - else: - chains["untrusted"] += 1 - print "untrusted: " + f + chains["untrusted"] += 1 + print "untrusted: " + f['host'] - if valid: - hosts += 1 + if valid: + hosts += 1 - for chain in server_chains: - collect_key_sizes(chain) - chain_len[str(len(chain))] += 1 - if len(chain) == 1: - print "file with chain 1 long " + f_abs - total += 1 + for chain in server_chains: + f_names = [] + for hash in chain: + path = get_path_for_hash(hash) + f_names += [path] + + collect_key_sizes(f_names) + chain_len[str(len(chain))] += 1 + if len(chain) == 1: + sys.stderr.write("file with chain 1 long: " + line) + total += 1 + except TypeError as e: + + sys.stderr.write("can't process: " + line) + continue """ Display stats """ #print "openssl invocations: " + str(invocations["openssl"])