2
0
mirror of https://github.com/mozilla/cipherscan.git synced 2024-11-22 22:33:40 +01:00

use pre-parsed data outputted by the C application

This commit is contained in:
Hubert Kario 2014-08-03 17:19:56 +02:00 committed by Hubert Kario
parent 0591829ed1
commit 9a956dc5a5

View File

@ -18,37 +18,7 @@ import json
import sys import sys
from collections import defaultdict from collections import defaultdict
import os import os
import re
import subprocess
from OpenSSL import crypto from OpenSSL import crypto
from M2Crypto import X509, EVP
from m2ext import _m2ext
from m2ext import SSL
import glob
from pprint import pprint
# override m2ext implementation so that it is possible to provide additional
# certificates to be used during verification. Requires m2ext with a small fix
# from https://github.com/tomato42/m2ext/tree/extended_ctx_init
class Context(SSL.Context):
def validate_certificate(self, cert, chain=None):
"""
Validate a certificate using this SSL Context
"""
if chain:
ptr = chain._ptr()
else:
ptr = None
store_ctx = X509.X509_Store_Context(_m2ext.x509_store_ctx_new(), _pyfree=1)
_m2ext.x509_store_ctx_init(store_ctx.ctx,
self.get_cert_store().store,
cert.x509, ptr)
rc = _m2ext.x509_verify_cert(store_ctx.ctx)
if rc < 0:
raise SSL.SSLError("Empty context")
return rc != 0
invocations = defaultdict(int) invocations = defaultdict(int)
@ -86,98 +56,15 @@ def get_cert_subject_name(cert):
else: else:
return s_hash return s_hash
def get_cert_hashes(path):
if path in subject_hashes:
return subject_hashes[path], issuer_hashes[path]
with open(path) as srv_c_f:
srv_c_pem = srv_c_f.read()
srv_c = crypto.load_certificate(crypto.FILETYPE_PEM, srv_c_pem)
# can't make M2Crypto to output OpenSSL-compatible hashes...
subject_hash = ("%0.8X" % srv_c.get_subject().hash()).lower()
issuer_hash = ("%0.8X" % srv_c.get_issuer().hash()).lower()
subject_hashes[path] = subject_hash
issuer_hashes[path] = issuer_hash
return subject_hash, issuer_hash
def gen_cert_paths(paths):
# failsafe in case of a loop in path resolution
if len(paths) > 10:
return
subject_hash, issuer_hash = get_cert_hashes(paths[-1])
if subject_hash == issuer_hash:
yield paths
else:
for ca_file in glob.glob(ca_certs_path + '/' + issuer_hash + ".*"):
for perm in gen_cert_paths(paths + [ca_file]):
if not perm in paths:
yield perm
def is_chain_complete_f(file_names):
stack = X509.X509_Stack()
for f_name in file_names[1:]:
cert = X509.load_cert(f_name)
stack.push(cert)
cert = X509.load_cert(file_names[0])
return trusted_context.validate_certificate(cert, stack)
def is_chain_complete(certs):
stack = X509.X509_Stack()
for cert in certs[1:]:
stack.push(cert)
return trusted_context.validate_certificate(certs[0], stack)
def is_chain_trusted(cert_hashes):
c_hash = cert_hashes[0]
""" first check the likely option: the cert dir """
file_name = certs_path + '/' + c_hash + '.pem'
if not os.path.exists(file_name):
""" then try the unlikely option: ca directory """
file_name = ca_certs_path + '/' + c_hash + '.pem'
if not os.path.exists(file_name):
print "File with hash " + c_hash + " is missing!"
return False,None
for cert_paths in gen_cert_paths([ file_name ]):
if is_chain_complete_f(cert_paths):
return True,cert_paths
return False,None
def get_path_for_hash(cert_hash): def get_path_for_hash(cert_hash):
f_name = certs_path + '/' + c_hash + '.pem' f_name = certs_path + '/' + cert_hash + '.pem'
if not os.path.exists(f_name): if not os.path.exists(f_name):
f_name = ca_certs_path + '/' + c_hash + '.pem' f_name = ca_certs_path + '/' + cert_hash + '.pem'
if not os.path.exists(f_name): if not os.path.exists(f_name):
#print "File with hash " + c_hash + " is missing!" #print "File with hash " + c_hash + " is missing!"
return None return None
return f_name return f_name
def is_chain_trusted_at_all(cert_list):
certs = []
stack = X509.X509_Stack()
cert = cert_list[0]
for ca in cert_list[1:]:
stack.push(ca)
return all_CAs_context.validate_certificate(cert, stack)
""" convert RSA and DSA key sizes to estimated Level of security """ """ convert RSA and DSA key sizes to estimated Level of security """
def rsa_key_size_to_los(size): def rsa_key_size_to_los(size):
if size < 760: if size < 760:
@ -304,94 +191,84 @@ def collect_key_sizes(file_names):
effective_security[security_level] += 1 effective_security[security_level] += 1
all_CAs_context = Context() with open("parsed") as res_file:
all_CAs_context.load_verify_locations(capath=ca_certs_path) for line in res_file:
trusted_context = Context() try:
trusted_context.load_verify_locations(capath=trust_path) res = json.loads(line)
except ValueError as e:
print "can't process line: " + line
continue
for r,d,flist in os.walk(path): f=res
for f in flist:
server_chain_trusted = False try:
server_chain_complete = False server_chain_trusted = False
server_chains = [] server_chain_complete = False
chains_tested = [] server_chains = []
valid = True valid = False
""" process the file """
f_abs = os.path.join(r,f)
with open(f_abs) as json_file:
""" Keep certificates in memory for a given file """ """ Keep certificates in memory for a given file """
known_certs = {} known_certs = {}
""" discard files that fail to load """ if not "chains" in f:
try:
results = json.load(json_file)
except ValueError:
continue continue
""" discard files with empty results """ results = f["chains"]
if len(results['ciphersuite']) < 1:
continue
valid = True """ discard hosts with empty results """
if len(results) < 1:
continue
""" loop over list of ciphers """ """ loop over list of ciphers """
for entry in results['ciphersuite']: for entry in results:
""" skip entries which don't have certificate references """ """ skip invalid results """
if not 'certificates' in entry: if not 'chain' in entry:
continue continue
""" skip entries for A(EC)DH suites """ valid = True
if len(entry['certificates']) < 1:
if entry['chain'] == "untrusted":
continue continue
if not entry['certificates'] in chains_tested: if entry['chain'] == "complete":
certs = [] server_chain_complete = True
server_chain_trusted = True
for c_hash in entry['certificates']: if entry['chain'] == "incomplete":
if c_hash in known_certs: server_chain_trusted = True
certs += [known_certs[c_hash]]
else:
path = get_path_for_hash(c_hash)
if path is None:
continue
cert = X509.load_cert(path)
known_certs[c_hash] = cert
certs += [cert]
if is_chain_trusted_at_all(certs): server_chains += [entry['certificates']]
ret,tmpchain = is_chain_trusted(entry['certificates'])
if ret:
server_chain_trusted = True
if not tmpchain in server_chains:
server_chains += [tmpchain]
if is_chain_complete(certs):
server_chain_complete = True
chains_tested += [entry['certificates']] if server_chain_trusted:
if server_chain_complete:
if server_chain_trusted: chains["complete"] += 1
if server_chain_complete: print "complete: " + f['host']
chains["complete"] += 1 else:
print "complete: " + f chains["incomplete"] += 1
print "incomplete: " + f['host']
else: else:
chains["incomplete"] += 1 chains["untrusted"] += 1
print "incomplete: " + f print "untrusted: " + f['host']
else:
chains["untrusted"] += 1
print "untrusted: " + f
if valid: if valid:
hosts += 1 hosts += 1
for chain in server_chains: for chain in server_chains:
collect_key_sizes(chain) f_names = []
chain_len[str(len(chain))] += 1 for hash in chain:
if len(chain) == 1: path = get_path_for_hash(hash)
print "file with chain 1 long " + f_abs f_names += [path]
total += 1
collect_key_sizes(f_names)
chain_len[str(len(chain))] += 1
if len(chain) == 1:
sys.stderr.write("file with chain 1 long: " + line)
total += 1
except TypeError as e:
sys.stderr.write("can't process: " + line)
continue
""" Display stats """ """ Display stats """
#print "openssl invocations: " + str(invocations["openssl"]) #print "openssl invocations: " + str(invocations["openssl"])