diff options
Diffstat (limited to 'src/kernelcheck/lib/kernellib.py')
-rw-r--r-- | src/kernelcheck/lib/kernellib.py | 616 |
1 files changed, 616 insertions, 0 deletions
diff --git a/src/kernelcheck/lib/kernellib.py b/src/kernelcheck/lib/kernellib.py new file mode 100644 index 0000000..cf2e6ab --- /dev/null +++ b/src/kernelcheck/lib/kernellib.py @@ -0,0 +1,616 @@ +#!/usr/bin/env python +# kernel-check -- Kernel security information +# Copyright 2009-2009 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from __future__ import with_statement +from contextlib import closing +import cStringIO +import datetime +import inspect +import logging +import mmap +import os +import portage +import re +import urllib +import xml.etree.cElementTree + + +ARCHES = [ + 'all', 'alpha', 'amd64', 'amd64-fbsd', 'arm', 'hppa', 'ia64', 'm68k', + 'mips', 'ppc', 'ppc64', 's390', 'sh', 'sparc', 'sparc-fbsd', 'x86', + 'x86-fbsd' +] + +REGEX = { + 'gp_version' : re.compile(r'(?<=K_GENPATCHES_VER\=\").+(?=\")'), + 'gp_want' : re.compile(r'(?<=K_WANT_GENPATCHES\=\").+(?=\")'), + 'k_version' : re.compile(r'^((?:\d{1,2}\.){0,4}\d{1,2})(-.*)?$'), + 'rc_kernel' : re.compile(r'^rc\d{1,3}$'), + 'git_kernel' : re.compile(r'^git(\d{1,3})$'), + 'r_kernel' : re.compile(r'^r\d{1,3}$') +} + +SUPPORTED = ['gentoo', 'vanilla', 'hardened'] + +KERNEL_TYPES = [ + 'aa', 'acpi', 'ac', 'alpha', 'arm', 'as', 'cell', 'ck', 'compaq', 'crypto', + 'development', 'gaming','gentoo-dev', 'gentoo', 'gentoo-test', 'gfs', + 'git', 'grsec', 'gs', 'hardened-dev', 'hardened', 'hppa-dev', 'hppa', + 'ia64', 'kurobox', 'linux', 'lolo', 'mips-prepatch', 'mips', 'mjc', 'mm', + 'mosix', 'openblocks', 'openmosix','openvz', 'pac', 'pegasos-dev', + 'pegasos', 'pfeifer', 'planet-ccrma', 'ppc64', 'ppc-development', + 'ppc-dev', 'ppc', 'redhat', 'rsbac-dev', 'rsbac', 'selinux', 'sh', + 'sparc-dev', 'sparc', 'suspend2', 'systrace', 'tuxonice', 'uclinux', + 'usermode', 'vanilla-prepatch', 'vanilla', 'vanilla-tiny', 'vserver-dev', + 'vserver', 'win4lin', 'wolk-dev', 'wolk', 'xbox', 'xen', 'xfs' +] + +VERSION = '0.3.10' +DEBUG = False +FILEPATH = os.path.dirname(os.path.realpath(__file__)) +PORTDIR = portage.settings['PORTDIR'] +DIR = { + 'tmp' : os.path.join(FILEPATH, 'tmp'), + 'out' : os.path.join(PORTDIR, 'metadata', 'kernel'), + 'bug' : os.path.join(FILEPATH, 'tmp', 'bug'), + 'nvd' : os.path.join(FILEPATH, 'tmp', 'nvd') +} + +def BUG_ON(msg, e): + if DEBUG: + print 'DEBUG line %s in %s(): %s -> %s' % (inspect.stack()[1][2], + inspect.stack()[1][3], msg, e) + + +class Evaluation: + """Evaluation class + + Provides information about the vulnerability of a kernel. + """ + + read = int() + arch = int() + affected = list() + unaffected = list() + + def __init__(self): + self.affected = list() + self.unaffected = list() + + +class Comparison: #TODO Check if deprecated + """Comparison class + """ + + fixed = int() + new = list() + #TODO add more information + + def __init__(self): + self.fixed = list() + self.new = list() + + +class Cve: + """Common vulnerabilities and exposures class + + Contains all important information about a cve. + + Attributes: + cve: a string represeting the cve number of the class. + desc: a string providing a detailed description for the cve. + published: a string representing the original cve release date. + refs: a list of external references. + severity: a string representing the cve severity. + score: a floating point representing cvss base score. + vector: a string providing the cve access vector. + """ + + cve = str() + desc = str() + published = str() + refs = list() + severity = str() + score = float() + vector = str() + + def __init__(self, cve): + self.cve = cve + + def __eq__(self, diff): + return (self.cve == diff.cve) #FIXME is this enough? + + def __ne__(self, diff): + return not self.__eq__(diff) + + +class Genpatch: + 'Genpatch class' + + base = bool() + extras = bool() + kernel = None + version = str() + + def __init__(self, version): + self.version = version + + + def __repr__(self): + if self.base and self.extras: + return 'base extras' + if self.base: + return 'base' + if self.extras: + return 'extras' + + + def __eq__(self, diff): + if self.kernel == diff.kernel: + return (''.join((str(self.base), str(self.extras), self.version)) + == ''.join((str(diff.base), str(diff.extras), diff.version))) + else: + return False + + + def __ne__(self, diff): + return not self.__eq__(diff) + + +class Kernel: + 'Kernel class' + + revision = str() + source = str() + version = str() + genpatch = None + + def __init__(self, source): + self.source = source + + + def __repr__(self): + return str(self.version + '-' + self.source + '-' + self.revision) + + + def __eq__(self, diff): + return (''.join((self.revision, self.source, + self.version, str(self.genpatch))) + == ''.join((diff.revision, diff.source, + diff.version, str(diff.genpatch)))) + + + def __ne__(self, diff): + return not self.__eq__(diff) + + +class Vulnerability: + 'Vulnerability class' + + arch = str() + bugid = int() + cvelist = list() + cves = list() + affected = list() + reported = str() + reporter = str() + status = str() + + def __init__(self, bugid): + self.bugid = bugid + + def __eq__(self, diff): + return (self.bugid == diff.bugid) #FIXME is this enough? + + def __ne__(self, diff): + return not self.__eq__(diff) + + +class Interval: + """Interval class + + Provides one interval entry for a vulnerability + + Attributes: + name: a string representing the name of the kernel release + lower: a string representing the lower boundary of the interval + upper: a string representing the upper boundary of the interval + lower_i: a boolean indicating if the lower boundary is inclusive + upper_i: a boolean indicating if the upper boundary is inclusive + """ + + name = str() + lower = str() + upper = str() + lower_i = bool() + upper_i = bool() + + def __init__(self, name, lower, upper, lower_i, upper_i): + if name == 'linux' or name == 'genpatches': + pass + elif name == 'gp': + name = 'genpatches' + + name = name.replace('-sources', '') + + self.name = name + self.lower_i = lower_i + self.upper_i = upper_i + if name == 'genpatches': + if lower: + self.lower = lower.replace('-','.') + else: + self.lower = lower + if upper: + self.upper = upper.replace('-','.') + else: + self.upper = upper + else: + self.lower = lower + self.upper = upper + + + def __repr__(self): + interval = str(self.name) + interval += ' ' + if self.lower and self.lower_i: + interval += '>=%s ' % (self.lower) + if self.lower and not self.lower_i: + interval += '>%s ' % (self.lower) + if self.upper and self.upper_i: + interval += '<=%s' % (self.upper) + if self.upper and not self.upper_i: + interval += '<%s' % (self.upper) + + return interval + + +def interval_from_xml(root): + 'Returns an interval from xml' + + name = root.get('source') + + lower = '' + upper = '' + lower_i = False + upper_i = False + + if root.find('lower') is not None: + lower = root.find('lower').text + lower_i = (root.find('lower').get('inclusive') == 'true') + + if root.find('upper') is not None: + upper = root.find('upper').text + upper_i = (root.find('upper').get('inclusive') == 'true') + + return Interval(name, lower, upper, lower_i, upper_i) + + +#TODO Use exceptions +def is_in_interval(interval, kernel, bugid=None): + 'Returns True if the given version is inside our specified interval' + + version = str() + + if interval.name == 'linux': + version = kernel.version + + elif interval.name == 'genpatches': + version = kernel.version.replace('-', '.') + + elif interval.name == 'hardened': + version = kernel.version #TODO is this correct? + + elif interval.name == 'xen': + version = kernel.version #TODO is this correct? + + elif interval.name == 'vserver': + return False + + else: + BUG_ON(interval.name + ' ' + bugid.bugid) + return False + + for item in ['lower', 'upper']: + if getattr(interval, item): + result = portage.versions.vercmp(version, getattr(interval, item)) + + if result == None: + BUG_ON('Could not compare %s and %s' % + (getattr(interval, item),version)) + + if result == 0 and not getattr(interval, item + '_i'): + return False + + if result == 0 and getattr(interval, item + '_i'): + return True + + if item == 'lower' and result < 0: + return False + + if item == 'upper' and result > 0: + return False + + return True + + +def get_genpatch(directory, kernel): + 'Returns a list containing all genpatches from portage' + + patches = list() + directory = os.path.join(directory, 'sys-kernel') + + for sources in os.listdir(directory): + if '-sources' in sources: + for ebuild in os.listdir(os.path.join(directory, sources)): + if '.ebuild' in ebuild: + genpatch = extract_genpatch(ebuild, directory, sources) + + if genpatch is not None: + if genpatch.kernel == kernel: + return genpatch + + return None + + +def extract_genpatch(ebuild, directory, sources): + 'Returns a genpatch from an ebuild' + + pkg = portage.versions.catpkgsplit('sys-kernel/%s' % ebuild[:-7]) + + with open(os.path.join(directory, sources, ebuild), 'r') as ebuild_file: + content = ebuild_file.read() + + try: + genpatch_v = REGEX['gp_version'].findall(content)[0] + genpatch_w = REGEX['gp_want'].findall(content)[0] + except: + return None + + kernel = Kernel(pkg[1].replace('-sources', '')) + kernel.version = pkg[2] + kernel.revision = pkg[3] + + genpatch = Genpatch(pkg[2] + '-' + genpatch_v) + genpatch.kernel = kernel + genpatch.base = ('base' in genpatch_w) + genpatch.extras = ('extras' in genpatch_w) + + return genpatch + + +def parse_cve_files(directory): + 'Returns all bug files as list' + + files = list() + + if (os.path.exists(directory)): + for item in os.listdir(directory): + try: + cve_file = read_cve_file(directory, item[:-4]) + if cve_file is not None: + files.append(cve_file) + + except AttributeError, e: + BUG_ON(item, e) + + return files + + +def find_cve(cve, directory): + 'Returns a bug containing the cve' + + for item in parse_cve_files(directory): + for cves in item.cves: + if cve == cves.cve: + return item + + return None + + +def eval_cve_files(directory, kernel, arch): + 'Returns a vulnerabilty evaluation' + + files = parse_cve_files(directory) + + if not files: + return None + + evaluation = Evaluation() + + for item in files: + evaluation.read += 1 + + if item.arch not in ARCHES: + BUG_ON('[Error] Wrong architecture %s in bugid: %s' % + (item.arch, item.bugid)) + + if item.arch != arch and item.arch != 'all': + evaluation.unaffected.append(item) + else: + evaluation.arch += 1 + + if is_affected(item.affected, kernel, item): + evaluation.affected.append(item) + else: + evaluation.unaffected.append(item) + + return evaluation + +#TODO Remove item +def is_affected(interval_list, kernel, item): + 'Returns true if a kernel is affected' + + kernel_gentoo = (kernel.source == 'gentoo' and kernel.genpatch is not None) + kernel_affected = False + kernel_linux_affected = False + kernel_gp_affected = False + linux_interval = False + gentoo_interval = False + + for interval in interval_list: + if interval.name == 'genpatches': + gentoo_interval = True + if kernel_gentoo: + if is_in_interval(interval, kernel.genpatch, item): + kernel_gp_affected = True + + elif interval.name == 'linux': + linux_interval = True + if is_in_interval(interval, kernel, item): + kernel_linux_affected = True + + else: + pass #TODO + + if linux_interval: + if kernel_linux_affected: + if gentoo_interval and kernel_gentoo: + if kernel_gp_affected: + kernel_affected = True + else: + kernel_affected = False + else: + kernel_affected = True + else: + kernel_affected = False + else: + if kernel_gentoo and gentoo_interval: + if kernel_gp_affected: + kernel_affected = True + else: + kernel_affected = False + #TODO Implement else for hardend/xen/expand + + return kernel_affected + + +def compare_evaluation(kernel, compare): + 'Creates a comparison out of two evaluation instances' + + comparison = Comparison() + + if kernel.read != compare.read or kernel.arch != compare.arch: + BUG_ON('Kernels do not match: %s %s' % (kernel1.read, kernel2.read)) + return + + for item in kernel.affected: + if item not in compare.affected: + comparison.fixed.append(item) + + for item in compare.affected: + if item not in kernel.affected: + comparison.new.append(item) + + return comparison + + +def read_cve_file(directory, bugid): + 'Read a bug file created by collector' + + cves = list() + affected = list() + + filename = os.path.join(directory, bugid + '.xml') + + try: + with open(filename, 'r+') as xml_data: + memory_map = mmap.mmap(xml_data.fileno(), 0) + root = xml.etree.cElementTree.parse(memory_map).getroot() + except IOError, e: + BUG_ON(filename, e) + return None + + bugroot = root.find('bug') + + vul = Vulnerability(bugroot.find('bugid').text) + + for elem in ['arch', 'reported', 'reporter', 'status']: + setattr(vul, elem, bugroot.find(elem).text) + + affectedroot = bugroot.find('affected') + for item in affectedroot: + interval = interval_from_xml(item) + affected.append(interval) + + vul.affected = affected + + for item in root: + if item.tag == 'cve': + cve = Cve(item.find('cve').text) + if cve is None: + return None + + for elem in ['desc', 'published', 'refs', + 'severity', 'score', 'vector']: + element = item.find(elem) + if element is not None: + setattr(cve, elem, element.text) + else: + BUG_ON(filename, '(%s, \'No such element\')' % elem) + + cves.append(cve) + vul.cves = cves + + return vul + +#TODO Use Exceptions +def extract_version(release): + 'Extracts revision, source and version out of a release tag' + + match = REGEX['k_version'].match(release) + if not match: + BUG_ON('[Error] Release %s does not contain any valid information' % + release) + return None + + version, rest = match.groups() + + kernel = Kernel('vanilla') + kernel.revision = 'r0' + kernel.version = version + + for elem in (rest or '').split('-'): + if elem == 'sources': + pass + elif REGEX['rc_kernel'].match(elem): + kernel.version += '_' + elem + elif REGEX['git_kernel'].match(elem): + kernel.source = 'git' + kernel.revision = 'r' + REGEX['gitd'].match(elem).groups()[0] + elif REGEX['r_kernel'].match(elem): + kernel.revision = elem + elif elem in KERNEL_TYPES: + kernel.source = elem + elif elem != '': + BUG_ON('[Error] Dropping unknown version component \'%s\', \ + probably local tag.' % elem) + + return kernel + + +def all_version(source): + """ Given a kernel source name (e.g. vanilla), returns a Kernel object + for the latest revision in the tree, or None if none exists. """ + + versions = list() + + porttree = portage.db[portage.root]['porttree'] + matches = porttree.dbapi.xmatch('match-all', + 'sys-kernel/%s-sources' % source) + + for item in matches: + best = portage.versions.catpkgsplit(item) + if not best: + continue + + kernel = Kernel(best[1].replace('-sources', '')) + kernel.version = best[2] + kernel.revision = best[3] + + versions.append(kernel) + + return versions + |