diff options
author | 2011-01-18 14:13:31 +0100 | |
---|---|---|
committer | 2011-01-18 14:13:31 +0100 | |
commit | cd173a7f26ee3df1e038c131a3270036d7f561d0 (patch) | |
tree | 9c7db1502305a2419243f9a8e852c2323f2b1088 /_pytest | |
parent | Fix test_compile_framework_vref on 64-bit. (diff) | |
download | pypy-cd173a7f26ee3df1e038c131a3270036d7f561d0.tar.gz pypy-cd173a7f26ee3df1e038c131a3270036d7f561d0.tar.bz2 pypy-cd173a7f26ee3df1e038c131a3270036d7f561d0.zip |
remove old py copy, add current pytest and py lib snapshots (from pytest-2.0.1dev and py-1.4.1dev)
and some initial tweeks to conftest.py
Diffstat (limited to '_pytest')
-rw-r--r-- | _pytest/__init__.py | 1 | ||||
-rw-r--r-- | _pytest/assertion.py | 168 | ||||
-rw-r--r-- | _pytest/capture.py | 228 | ||||
-rw-r--r-- | _pytest/config.py | 434 | ||||
-rw-r--r-- | _pytest/core.py | 444 | ||||
-rw-r--r-- | _pytest/doctest.py | 87 | ||||
-rwxr-xr-x | _pytest/genscript.py | 73 | ||||
-rw-r--r-- | _pytest/helpconfig.py | 182 | ||||
-rw-r--r-- | _pytest/hookspec.py | 222 | ||||
-rw-r--r-- | _pytest/junitxml.py | 173 | ||||
-rw-r--r-- | _pytest/main.py | 517 | ||||
-rw-r--r-- | _pytest/mark.py | 176 | ||||
-rw-r--r-- | _pytest/monkeypatch.py | 103 | ||||
-rw-r--r-- | _pytest/nose.py | 47 | ||||
-rw-r--r-- | _pytest/pastebin.py | 63 | ||||
-rw-r--r-- | _pytest/pdb.py | 76 | ||||
-rw-r--r-- | _pytest/pytester.py | 674 | ||||
-rw-r--r-- | _pytest/python.py | 855 | ||||
-rw-r--r-- | _pytest/recwarn.py | 96 | ||||
-rw-r--r-- | _pytest/resultlog.py | 93 | ||||
-rw-r--r-- | _pytest/runner.py | 390 | ||||
-rw-r--r-- | _pytest/skipping.py | 213 | ||||
-rwxr-xr-x | _pytest/standalonetemplate.py | 63 | ||||
-rw-r--r-- | _pytest/terminal.py | 467 | ||||
-rw-r--r-- | _pytest/tmpdir.py | 71 | ||||
-rw-r--r-- | _pytest/unittest.py | 139 |
26 files changed, 6055 insertions, 0 deletions
diff --git a/_pytest/__init__.py b/_pytest/__init__.py new file mode 100644 index 0000000000..792d600548 --- /dev/null +++ b/_pytest/__init__.py @@ -0,0 +1 @@ +# diff --git a/_pytest/assertion.py b/_pytest/assertion.py new file mode 100644 index 0000000000..8f09e66612 --- /dev/null +++ b/_pytest/assertion.py @@ -0,0 +1,168 @@ +""" +support for presented detailed information in failing assertions. +""" +import py +import sys +from _pytest.monkeypatch import monkeypatch + +def pytest_addoption(parser): + group = parser.getgroup("debugconfig") + group._addoption('--no-assert', action="store_true", default=False, + dest="noassert", + help="disable python assert expression reinterpretation."), + +def pytest_configure(config): + # The _pytesthook attribute on the AssertionError is used by + # py._code._assertionnew to detect this plugin was loaded and in + # turn call the hooks defined here as part of the + # DebugInterpreter. + config._monkeypatch = m = monkeypatch() + warn_about_missing_assertion() + if not config.getvalue("noassert") and not config.getvalue("nomagic"): + def callbinrepr(op, left, right): + hook_result = config.hook.pytest_assertrepr_compare( + config=config, op=op, left=left, right=right) + for new_expl in hook_result: + if new_expl: + return '\n~'.join(new_expl) + m.setattr(py.builtin.builtins, + 'AssertionError', py.code._AssertionError) + m.setattr(py.code, '_reprcompare', callbinrepr) + +def pytest_unconfigure(config): + config._monkeypatch.undo() + +def warn_about_missing_assertion(): + try: + assert False + except AssertionError: + pass + else: + sys.stderr.write("WARNING: failing tests may report as passing because " + "assertions are turned off! (are you using python -O?)\n") + +# Provide basestring in python3 +try: + basestring = basestring +except NameError: + basestring = str + + +def pytest_assertrepr_compare(op, left, right): + """return specialised explanations for some operators/operands""" + width = 80 - 15 - len(op) - 2 # 15 chars indentation, 1 space around op + left_repr = py.io.saferepr(left, maxsize=width/2) + right_repr = py.io.saferepr(right, maxsize=width-len(left_repr)) + summary = '%s %s %s' % (left_repr, op, right_repr) + + issequence = lambda x: isinstance(x, (list, tuple)) + istext = lambda x: isinstance(x, basestring) + isdict = lambda x: isinstance(x, dict) + isset = lambda x: isinstance(x, set) + + explanation = None + try: + if op == '==': + if istext(left) and istext(right): + explanation = _diff_text(left, right) + elif issequence(left) and issequence(right): + explanation = _compare_eq_sequence(left, right) + elif isset(left) and isset(right): + explanation = _compare_eq_set(left, right) + elif isdict(left) and isdict(right): + explanation = _diff_text(py.std.pprint.pformat(left), + py.std.pprint.pformat(right)) + elif op == 'not in': + if istext(left) and istext(right): + explanation = _notin_text(left, right) + except py.builtin._sysex: + raise + except: + excinfo = py.code.ExceptionInfo() + explanation = ['(pytest_assertion plugin: representation of ' + 'details failed. Probably an object has a faulty __repr__.)', + str(excinfo) + ] + + + if not explanation: + return None + + # Don't include pageloads of data, should be configurable + if len(''.join(explanation)) > 80*8: + explanation = ['Detailed information too verbose, truncated'] + + return [summary] + explanation + + +def _diff_text(left, right): + """Return the explanation for the diff between text + + This will skip leading and trailing characters which are + identical to keep the diff minimal. + """ + explanation = [] + i = 0 # just in case left or right has zero length + for i in range(min(len(left), len(right))): + if left[i] != right[i]: + break + if i > 42: + i -= 10 # Provide some context + explanation = ['Skipping %s identical ' + 'leading characters in diff' % i] + left = left[i:] + right = right[i:] + if len(left) == len(right): + for i in range(len(left)): + if left[-i] != right[-i]: + break + if i > 42: + i -= 10 # Provide some context + explanation += ['Skipping %s identical ' + 'trailing characters in diff' % i] + left = left[:-i] + right = right[:-i] + explanation += [line.strip('\n') + for line in py.std.difflib.ndiff(left.splitlines(), + right.splitlines())] + return explanation + + +def _compare_eq_sequence(left, right): + explanation = [] + for i in range(min(len(left), len(right))): + if left[i] != right[i]: + explanation += ['At index %s diff: %r != %r' % + (i, left[i], right[i])] + break + if len(left) > len(right): + explanation += ['Left contains more items, ' + 'first extra item: %s' % py.io.saferepr(left[len(right)],)] + elif len(left) < len(right): + explanation += ['Right contains more items, ' + 'first extra item: %s' % py.io.saferepr(right[len(left)],)] + return explanation # + _diff_text(py.std.pprint.pformat(left), + # py.std.pprint.pformat(right)) + + +def _compare_eq_set(left, right): + explanation = [] + diff_left = left - right + diff_right = right - left + if diff_left: + explanation.append('Extra items in the left set:') + for item in diff_left: + explanation.append(py.io.saferepr(item)) + if diff_right: + explanation.append('Extra items in the right set:') + for item in diff_right: + explanation.append(py.io.saferepr(item)) + return explanation + + +def _notin_text(term, text): + index = text.find(term) + head = text[:index] + tail = text[index+len(term):] + correct_text = head + tail + return _diff_text(correct_text, text) diff --git a/_pytest/capture.py b/_pytest/capture.py new file mode 100644 index 0000000000..2da398383d --- /dev/null +++ b/_pytest/capture.py @@ -0,0 +1,228 @@ +""" per-test stdout/stderr capturing mechanisms, ``capsys`` and ``capfd`` function arguments. """ + +import pytest, py +import os + +def pytest_addoption(parser): + group = parser.getgroup("general") + group._addoption('--capture', action="store", default=None, + metavar="method", type="choice", choices=['fd', 'sys', 'no'], + help="per-test capturing method: one of fd (default)|sys|no.") + group._addoption('-s', action="store_const", const="no", dest="capture", + help="shortcut for --capture=no.") + +def addouterr(rep, outerr): + repr = getattr(rep, 'longrepr', None) + if not hasattr(repr, 'addsection'): + return + for secname, content in zip(["out", "err"], outerr): + if content: + repr.addsection("Captured std%s" % secname, content.rstrip()) + +def pytest_unconfigure(config): + # registered in config.py during early conftest.py loading + capman = config.pluginmanager.getplugin('capturemanager') + while capman._method2capture: + name, cap = capman._method2capture.popitem() + # XXX logging module may wants to close it itself on process exit + # otherwise we could do finalization here and call "reset()". + cap.suspend() + +class NoCapture: + def startall(self): + pass + def resume(self): + pass + def reset(self): + pass + def suspend(self): + return "", "" + +class CaptureManager: + def __init__(self): + self._method2capture = {} + + def _maketempfile(self): + f = py.std.tempfile.TemporaryFile() + newf = py.io.dupfile(f, encoding="UTF-8") + f.close() + return newf + + def _makestringio(self): + return py.io.TextIO() + + def _getcapture(self, method): + if method == "fd": + return py.io.StdCaptureFD(now=False, + out=self._maketempfile(), err=self._maketempfile() + ) + elif method == "sys": + return py.io.StdCapture(now=False, + out=self._makestringio(), err=self._makestringio() + ) + elif method == "no": + return NoCapture() + else: + raise ValueError("unknown capturing method: %r" % method) + + def _getmethod_preoptionparse(self, args): + if '-s' in args or "--capture=no" in args: + return "no" + elif hasattr(os, 'dup') and '--capture=sys' not in args: + return "fd" + else: + return "sys" + + def _getmethod(self, config, fspath): + if config.option.capture: + method = config.option.capture + else: + try: + method = config._conftest.rget("option_capture", path=fspath) + except KeyError: + method = "fd" + if method == "fd" and not hasattr(os, 'dup'): # e.g. jython + method = "sys" + return method + + def resumecapture_item(self, item): + method = self._getmethod(item.config, item.fspath) + if not hasattr(item, 'outerr'): + item.outerr = ('', '') # we accumulate outerr on the item + return self.resumecapture(method) + + def resumecapture(self, method): + if hasattr(self, '_capturing'): + raise ValueError("cannot resume, already capturing with %r" % + (self._capturing,)) + cap = self._method2capture.get(method) + self._capturing = method + if cap is None: + self._method2capture[method] = cap = self._getcapture(method) + cap.startall() + else: + cap.resume() + + def suspendcapture(self, item=None): + self.deactivate_funcargs() + if hasattr(self, '_capturing'): + method = self._capturing + cap = self._method2capture.get(method) + if cap is not None: + outerr = cap.suspend() + del self._capturing + if item: + outerr = (item.outerr[0] + outerr[0], + item.outerr[1] + outerr[1]) + return outerr + if hasattr(item, 'outerr'): + return item.outerr + return "", "" + + def activate_funcargs(self, pyfuncitem): + if not hasattr(pyfuncitem, 'funcargs'): + return + assert not hasattr(self, '_capturing_funcargs') + self._capturing_funcargs = capturing_funcargs = [] + for name, capfuncarg in pyfuncitem.funcargs.items(): + if name in ('capsys', 'capfd'): + capturing_funcargs.append(capfuncarg) + capfuncarg._start() + + def deactivate_funcargs(self): + capturing_funcargs = getattr(self, '_capturing_funcargs', None) + if capturing_funcargs is not None: + while capturing_funcargs: + capfuncarg = capturing_funcargs.pop() + capfuncarg._finalize() + del self._capturing_funcargs + + def pytest_make_collect_report(self, __multicall__, collector): + method = self._getmethod(collector.config, collector.fspath) + try: + self.resumecapture(method) + except ValueError: + return # recursive collect, XXX refactor capturing + # to allow for more lightweight recursive capturing + try: + rep = __multicall__.execute() + finally: + outerr = self.suspendcapture() + addouterr(rep, outerr) + return rep + + @pytest.mark.tryfirst + def pytest_runtest_setup(self, item): + self.resumecapture_item(item) + + @pytest.mark.tryfirst + def pytest_runtest_call(self, item): + self.resumecapture_item(item) + self.activate_funcargs(item) + + @pytest.mark.tryfirst + def pytest_runtest_teardown(self, item): + self.resumecapture_item(item) + + def pytest__teardown_final(self, __multicall__, session): + method = self._getmethod(session.config, None) + self.resumecapture(method) + try: + rep = __multicall__.execute() + finally: + outerr = self.suspendcapture() + if rep: + addouterr(rep, outerr) + return rep + + def pytest_keyboard_interrupt(self, excinfo): + if hasattr(self, '_capturing'): + self.suspendcapture() + + @pytest.mark.tryfirst + def pytest_runtest_makereport(self, __multicall__, item, call): + self.deactivate_funcargs() + rep = __multicall__.execute() + outerr = self.suspendcapture(item) + if not rep.passed: + addouterr(rep, outerr) + if not rep.passed or rep.when == "teardown": + outerr = ('', '') + item.outerr = outerr + return rep + +def pytest_funcarg__capsys(request): + """captures writes to sys.stdout/sys.stderr and makes + them available successively via a ``capsys.readouterr()`` method + which returns a ``(out, err)`` tuple of captured snapshot strings. + """ + return CaptureFuncarg(py.io.StdCapture) + +def pytest_funcarg__capfd(request): + """captures writes to file descriptors 1 and 2 and makes + snapshotted ``(out, err)`` string tuples available + via the ``capsys.readouterr()`` method. If the underlying + platform does not have ``os.dup`` (e.g. Jython) tests using + this funcarg will automatically skip. + """ + if not hasattr(os, 'dup'): + py.test.skip("capfd funcarg needs os.dup") + return CaptureFuncarg(py.io.StdCaptureFD) + +class CaptureFuncarg: + def __init__(self, captureclass): + self.capture = captureclass(now=False) + + def _start(self): + self.capture.startall() + + def _finalize(self): + if hasattr(self, 'capture'): + self.capture.reset() + del self.capture + + def readouterr(self): + return self.capture.readouterr() + + def close(self): + self._finalize() diff --git a/_pytest/config.py b/_pytest/config.py new file mode 100644 index 0000000000..0c6c2e1628 --- /dev/null +++ b/_pytest/config.py @@ -0,0 +1,434 @@ +""" command line options, ini-file and conftest.py processing. """ + +import py +import sys, os +from _pytest.core import PluginManager +import pytest + +def pytest_cmdline_parse(pluginmanager, args): + config = Config(pluginmanager) + config.parse(args) + if config.option.debug: + config.trace.root.setwriter(sys.stderr.write) + return config + +class Parser: + """ Parser for command line arguments. """ + + def __init__(self, usage=None, processopt=None): + self._anonymous = OptionGroup("custom options", parser=self) + self._groups = [] + self._processopt = processopt + self._usage = usage + self._inidict = {} + self._ininames = [] + self.hints = [] + + def processoption(self, option): + if self._processopt: + if option.dest: + self._processopt(option) + + def addnote(self, note): + self._notes.append(note) + + def getgroup(self, name, description="", after=None): + """ get (or create) a named option Group. + + :name: unique name of the option group. + :description: long description for --help output. + :after: name of other group, used for ordering --help output. + """ + for group in self._groups: + if group.name == name: + return group + group = OptionGroup(name, description, parser=self) + i = 0 + for i, grp in enumerate(self._groups): + if grp.name == after: + break + self._groups.insert(i+1, group) + return group + + def addoption(self, *opts, **attrs): + """ add an optparse-style option. """ + self._anonymous.addoption(*opts, **attrs) + + def parse(self, args): + self.optparser = optparser = MyOptionParser(self) + groups = self._groups + [self._anonymous] + for group in groups: + if group.options: + desc = group.description or group.name + optgroup = py.std.optparse.OptionGroup(optparser, desc) + optgroup.add_options(group.options) + optparser.add_option_group(optgroup) + return self.optparser.parse_args([str(x) for x in args]) + + def parse_setoption(self, args, option): + parsedoption, args = self.parse(args) + for name, value in parsedoption.__dict__.items(): + setattr(option, name, value) + return args + + def addini(self, name, help, type=None, default=None): + """ add an ini-file option with the given name and description. """ + assert type in (None, "pathlist", "args", "linelist") + self._inidict[name] = (help, type, default) + self._ininames.append(name) + +class OptionGroup: + def __init__(self, name, description="", parser=None): + self.name = name + self.description = description + self.options = [] + self.parser = parser + + def addoption(self, *optnames, **attrs): + """ add an option to this group. """ + option = py.std.optparse.Option(*optnames, **attrs) + self._addoption_instance(option, shortupper=False) + + def _addoption(self, *optnames, **attrs): + option = py.std.optparse.Option(*optnames, **attrs) + self._addoption_instance(option, shortupper=True) + + def _addoption_instance(self, option, shortupper=False): + if not shortupper: + for opt in option._short_opts: + if opt[0] == '-' and opt[1].islower(): + raise ValueError("lowercase shortoptions reserved") + if self.parser: + self.parser.processoption(option) + self.options.append(option) + + +class MyOptionParser(py.std.optparse.OptionParser): + def __init__(self, parser): + self._parser = parser + py.std.optparse.OptionParser.__init__(self, usage=parser._usage, + add_help_option=False) + def format_epilog(self, formatter): + hints = self._parser.hints + if hints: + s = "\n".join(["hint: " + x for x in hints]) + "\n" + s = "\n" + s + "\n" + return s + return "" + +class Conftest(object): + """ the single place for accessing values and interacting + towards conftest modules from py.test objects. + """ + def __init__(self, onimport=None, confcutdir=None): + self._path2confmods = {} + self._onimport = onimport + self._conftestpath2mod = {} + self._confcutdir = confcutdir + + def setinitial(self, args): + """ try to find a first anchor path for looking up global values + from conftests. This function is usually called _before_ + argument parsing. conftest files may add command line options + and we thus have no completely safe way of determining + which parts of the arguments are actually related to options + and which are file system paths. We just try here to get + bootstrapped ... + """ + current = py.path.local() + opt = '--confcutdir' + for i in range(len(args)): + opt1 = str(args[i]) + if opt1.startswith(opt): + if opt1 == opt: + if len(args) > i: + p = current.join(args[i+1], abs=True) + elif opt1.startswith(opt + "="): + p = current.join(opt1[len(opt)+1:], abs=1) + self._confcutdir = p + break + for arg in args + [current]: + if hasattr(arg, 'startswith') and arg.startswith("--"): + continue + anchor = current.join(arg, abs=1) + if anchor.check(): # we found some file object + self._path2confmods[None] = self.getconftestmodules(anchor) + # let's also consider test* dirs + if anchor.check(dir=1): + for x in anchor.listdir("test*"): + if x.check(dir=1): + self.getconftestmodules(x) + break + else: + assert 0, "no root of filesystem?" + + def getconftestmodules(self, path): + """ return a list of imported conftest modules for the given path. """ + try: + clist = self._path2confmods[path] + except KeyError: + if path is None: + raise ValueError("missing default confest.") + dp = path.dirpath() + clist = [] + if dp != path: + cutdir = self._confcutdir + if cutdir and path != cutdir and not path.relto(cutdir): + pass + else: + conftestpath = path.join("conftest.py") + if conftestpath.check(file=1): + clist.append(self.importconftest(conftestpath)) + clist[:0] = self.getconftestmodules(dp) + self._path2confmods[path] = clist + # be defensive: avoid changes from caller side to + # affect us by always returning a copy of the actual list + return clist[:] + + def rget(self, name, path=None): + mod, value = self.rget_with_confmod(name, path) + return value + + def rget_with_confmod(self, name, path=None): + modules = self.getconftestmodules(path) + modules.reverse() + for mod in modules: + try: + return mod, getattr(mod, name) + except AttributeError: + continue + raise KeyError(name) + + def importconftest(self, conftestpath): + assert conftestpath.check(), conftestpath + try: + return self._conftestpath2mod[conftestpath] + except KeyError: + pkgpath = conftestpath.pypkgpath() + if pkgpath is None: + _ensure_removed_sysmodule(conftestpath.purebasename) + self._conftestpath2mod[conftestpath] = mod = conftestpath.pyimport() + dirpath = conftestpath.dirpath() + if dirpath in self._path2confmods: + for path, mods in self._path2confmods.items(): + if path and path.relto(dirpath) or path == dirpath: + assert mod not in mods + mods.append(mod) + self._postimport(mod) + return mod + + def _postimport(self, mod): + if self._onimport: + self._onimport(mod) + return mod + +def _ensure_removed_sysmodule(modname): + try: + del sys.modules[modname] + except KeyError: + pass + +class CmdOptions(object): + """ holds cmdline options as attributes.""" + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + def __repr__(self): + return "<CmdOptions %r>" %(self.__dict__,) + +class Config(object): + """ access to configuration values, pluginmanager and plugin hooks. """ + def __init__(self, pluginmanager=None): + #: command line option values, usually added via parser.addoption(...) + #: or parser.getgroup(...).addoption(...) calls + self.option = CmdOptions() + self._parser = Parser( + usage="usage: %prog [options] [file_or_dir] [file_or_dir] [...]", + processopt=self._processopt, + ) + #: a pluginmanager instance + self.pluginmanager = pluginmanager or PluginManager(load=True) + self.trace = self.pluginmanager.trace.root.get("config") + self._conftest = Conftest(onimport=self._onimportconftest) + self.hook = self.pluginmanager.hook + self._inicache = {} + + def _onimportconftest(self, conftestmodule): + self.trace("loaded conftestmodule %r" %(conftestmodule,)) + self.pluginmanager.consider_conftest(conftestmodule) + + def _processopt(self, opt): + if hasattr(opt, 'default') and opt.dest: + if not hasattr(self.option, opt.dest): + setattr(self.option, opt.dest, opt.default) + + def _getmatchingplugins(self, fspath): + allconftests = self._conftest._conftestpath2mod.values() + plugins = [x for x in self.pluginmanager.getplugins() + if x not in allconftests] + plugins += self._conftest.getconftestmodules(fspath) + return plugins + + def _setinitialconftest(self, args): + # capture output during conftest init (#issue93) + from _pytest.capture import CaptureManager + capman = CaptureManager() + self.pluginmanager.register(capman, 'capturemanager') + # will be unregistered in capture.py's unconfigure() + capman.resumecapture(capman._getmethod_preoptionparse(args)) + try: + try: + self._conftest.setinitial(args) + finally: + out, err = capman.suspendcapture() # logging might have got it + except: + sys.stdout.write(out) + sys.stderr.write(err) + raise + + def _initini(self, args): + self.inicfg = getcfg(args, ["pytest.ini", "tox.ini", "setup.cfg"]) + self._parser.addini('addopts', 'extra command line options', 'args') + self._parser.addini('minversion', 'minimally required pytest version') + + def _preparse(self, args, addopts=True): + self._initini(args) + if addopts: + args[:] = self.getini("addopts") + args + self._checkversion() + self.pluginmanager.consider_preparse(args) + self.pluginmanager.consider_setuptools_entrypoints() + self.pluginmanager.consider_env() + self._setinitialconftest(args) + self.pluginmanager.do_addoption(self._parser) + if addopts: + self.hook.pytest_cmdline_preparse(config=self, args=args) + + def _checkversion(self): + minver = self.inicfg.get('minversion', None) + if minver: + ver = minver.split(".") + myver = pytest.__version__.split(".") + if myver < ver: + raise pytest.UsageError( + "%s:%d: requires pytest-%s, actual pytest-%s'" %( + self.inicfg.config.path, self.inicfg.lineof('minversion'), + minver, pytest.__version__)) + + def parse(self, args): + # parse given cmdline arguments into this config object. + # Note that this can only be called once per testing process. + assert not hasattr(self, 'args'), ( + "can only parse cmdline args at most once per Config object") + self._preparse(args) + self._parser.hints.extend(self.pluginmanager._hints) + args = self._parser.parse_setoption(args, self.option) + if not args: + args.append(py.std.os.getcwd()) + self.args = args + + def getini(self, name): + """ return configuration value from an ini file. If the + specified name hasn't been registered through a prior ``parse.addini`` + call (usually from a plugin), a ValueError is raised. """ + try: + return self._inicache[name] + except KeyError: + self._inicache[name] = val = self._getini(name) + return val + + def _getini(self, name): + try: + description, type, default = self._parser._inidict[name] + except KeyError: + raise ValueError("unknown configuration value: %r" %(name,)) + try: + value = self.inicfg[name] + except KeyError: + if default is not None: + return default + if type is None: + return '' + return [] + if type == "pathlist": + dp = py.path.local(self.inicfg.config.path).dirpath() + l = [] + for relpath in py.std.shlex.split(value): + l.append(dp.join(relpath, abs=True)) + return l + elif type == "args": + return py.std.shlex.split(value) + elif type == "linelist": + return [t for t in map(lambda x: x.strip(), value.split("\n")) if t] + else: + assert type is None + return value + + def _getconftest_pathlist(self, name, path=None): + try: + mod, relroots = self._conftest.rget_with_confmod(name, path) + except KeyError: + return None + modpath = py.path.local(mod.__file__).dirpath() + l = [] + for relroot in relroots: + if not isinstance(relroot, py.path.local): + relroot = relroot.replace("/", py.path.local.sep) + relroot = modpath.join(relroot, abs=True) + l.append(relroot) + return l + + def _getconftest(self, name, path=None, check=False): + if check: + self._checkconftest(name) + return self._conftest.rget(name, path) + + def getvalue(self, name, path=None): + """ return ``name`` value looked set from command line options. + + (deprecated) if we can't find the option also lookup + the name in a matching conftest file. + """ + try: + return getattr(self.option, name) + except AttributeError: + return self._getconftest(name, path, check=False) + + def getvalueorskip(self, name, path=None): + """ (deprecated) return getvalue(name) or call + py.test.skip if no value exists. """ + __tracebackhide__ = True + try: + val = self.getvalue(name, path) + if val is None: + raise KeyError(name) + return val + except KeyError: + py.test.skip("no %r value found" %(name,)) + + +def getcfg(args, inibasenames): + args = [x for x in args if str(x)[0] != "-"] + if not args: + args = [py.path.local()] + for arg in args: + arg = py.path.local(arg) + for base in arg.parts(reverse=True): + for inibasename in inibasenames: + p = base.join(inibasename) + if p.check(): + iniconfig = py.iniconfig.IniConfig(p) + if 'pytest' in iniconfig.sections: + return iniconfig['pytest'] + return {} + +def findupwards(current, basename): + current = py.path.local(current) + while 1: + p = current.join(basename) + if p.check(): + return p + p = current.dirpath() + if p == current: + return + current = p + diff --git a/_pytest/core.py b/_pytest/core.py new file mode 100644 index 0000000000..7a67e7cbdf --- /dev/null +++ b/_pytest/core.py @@ -0,0 +1,444 @@ +""" +pytest PluginManager, basic initialization and tracing. +(c) Holger Krekel 2004-2010 +""" +import sys, os +import inspect +import py +from _pytest import hookspec # the extension point definitions + +assert py.__version__.split(".")[:2] >= ['1', '4'], ("installation problem: " + "%s is too old, remove or upgrade 'py'" % (py.__version__)) + +default_plugins = ( + "config mark main terminal runner python pdb unittest capture skipping " + "tmpdir monkeypatch recwarn pastebin helpconfig nose assertion genscript " + "junitxml resultlog doctest").split() + +class TagTracer: + def __init__(self, prefix="[pytest] "): + self._tag2proc = {} + self.writer = None + self.indent = 0 + self.prefix = prefix + + def get(self, name): + return TagTracerSub(self, (name,)) + + def processmessage(self, tags, args): + if self.writer is not None: + if args: + indent = " " * self.indent + content = " ".join(map(str, args)) + self.writer("%s%s%s\n" %(self.prefix, indent, content)) + try: + self._tag2proc[tags](tags, args) + except KeyError: + pass + + def setwriter(self, writer): + self.writer = writer + + def setprocessor(self, tags, processor): + if isinstance(tags, str): + tags = tuple(tags.split(":")) + else: + assert isinstance(tags, tuple) + self._tag2proc[tags] = processor + +class TagTracerSub: + def __init__(self, root, tags): + self.root = root + self.tags = tags + def __call__(self, *args): + self.root.processmessage(self.tags, args) + def setmyprocessor(self, processor): + self.root.setprocessor(self.tags, processor) + def get(self, name): + return self.__class__(self.root, self.tags + (name,)) + +class PluginManager(object): + def __init__(self, load=False): + self._name2plugin = {} + self._plugins = [] + self._hints = [] + self.trace = TagTracer().get("pluginmanage") + self._plugin_distinfo = [] + if os.environ.get('PYTEST_DEBUG'): + err = sys.stderr + encoding = getattr(err, 'encoding', 'utf8') + try: + err = py.io.dupfile(err, encoding=encoding) + except Exception: + pass + self.trace.root.setwriter(err.write) + self.hook = HookRelay([hookspec], pm=self) + self.register(self) + if load: + for spec in default_plugins: + self.import_plugin(spec) + + def register(self, plugin, name=None, prepend=False): + assert not self.isregistered(plugin), plugin + name = name or getattr(plugin, '__name__', str(id(plugin))) + if name in self._name2plugin: + return False + #self.trace("registering", name, plugin) + self._name2plugin[name] = plugin + self.call_plugin(plugin, "pytest_addhooks", {'pluginmanager': self}) + self.hook.pytest_plugin_registered(manager=self, plugin=plugin) + if not prepend: + self._plugins.append(plugin) + else: + self._plugins.insert(0, plugin) + return True + + def unregister(self, plugin=None, name=None): + if plugin is None: + plugin = self.getplugin(name=name) + self._plugins.remove(plugin) + self.hook.pytest_plugin_unregistered(plugin=plugin) + for name, value in list(self._name2plugin.items()): + if value == plugin: + del self._name2plugin[name] + + def isregistered(self, plugin, name=None): + if self.getplugin(name) is not None: + return True + for val in self._name2plugin.values(): + if plugin == val: + return True + + def addhooks(self, spec): + self.hook._addhooks(spec, prefix="pytest_") + + def getplugins(self): + return list(self._plugins) + + def skipifmissing(self, name): + if not self.hasplugin(name): + py.test.skip("plugin %r is missing" % name) + + def hasplugin(self, name): + return bool(self.getplugin(name)) + + def getplugin(self, name): + if name is None: + return None + try: + return self._name2plugin[name] + except KeyError: + return self._name2plugin.get("_pytest." + name, None) + + # API for bootstrapping + # + def _envlist(self, varname): + val = py.std.os.environ.get(varname, None) + if val is not None: + return val.split(',') + return () + + def consider_env(self): + for spec in self._envlist("PYTEST_PLUGINS"): + self.import_plugin(spec) + + def consider_setuptools_entrypoints(self): + try: + from pkg_resources import iter_entry_points, DistributionNotFound + except ImportError: + return # XXX issue a warning + for ep in iter_entry_points('pytest11'): + name = ep.name + if name.startswith("pytest_"): + name = name[7:] + if ep.name in self._name2plugin or name in self._name2plugin: + continue + try: + plugin = ep.load() + except DistributionNotFound: + continue + self._plugin_distinfo.append((ep.dist, plugin)) + self.register(plugin, name=name) + + def consider_preparse(self, args): + for opt1,opt2 in zip(args, args[1:]): + if opt1 == "-p": + if opt2.startswith("no:"): + name = opt2[3:] + if self.getplugin(name) is not None: + self.unregister(None, name=name) + self._name2plugin[name] = -1 + else: + if self.getplugin(opt2) is None: + self.import_plugin(opt2) + + def consider_conftest(self, conftestmodule): + if self.register(conftestmodule, name=conftestmodule.__file__): + self.consider_module(conftestmodule) + + def consider_module(self, mod): + attr = getattr(mod, "pytest_plugins", ()) + if attr: + if not isinstance(attr, (list, tuple)): + attr = (attr,) + for spec in attr: + self.import_plugin(spec) + + def import_plugin(self, modname): + assert isinstance(modname, str) + if self.getplugin(modname) is not None: + return + try: + #self.trace("importing", modname) + mod = importplugin(modname) + except KeyboardInterrupt: + raise + except ImportError: + if modname.startswith("pytest_"): + return self.import_plugin(modname[7:]) + raise + except: + e = py.std.sys.exc_info()[1] + if not hasattr(py.test, 'skip'): + raise + elif not isinstance(e, py.test.skip.Exception): + raise + self._hints.append("skipped plugin %r: %s" %((modname, e.msg))) + else: + self.register(mod, modname) + self.consider_module(mod) + + def pytest_plugin_registered(self, plugin): + import pytest + dic = self.call_plugin(plugin, "pytest_namespace", {}) or {} + if dic: + self._setns(pytest, dic) + if hasattr(self, '_config'): + self.call_plugin(plugin, "pytest_addoption", + {'parser': self._config._parser}) + self.call_plugin(plugin, "pytest_configure", + {'config': self._config}) + + def _setns(self, obj, dic): + import pytest + for name, value in dic.items(): + if isinstance(value, dict): + mod = getattr(obj, name, None) + if mod is None: + modname = "pytest.%s" % name + mod = py.std.types.ModuleType(modname) + sys.modules[modname] = mod + mod.__all__ = [] + setattr(obj, name, mod) + obj.__all__.append(name) + self._setns(mod, value) + else: + setattr(obj, name, value) + obj.__all__.append(name) + #if obj != pytest: + # pytest.__all__.append(name) + setattr(pytest, name, value) + + def pytest_terminal_summary(self, terminalreporter): + tw = terminalreporter._tw + if terminalreporter.config.option.traceconfig: + for hint in self._hints: + tw.line("hint: %s" % hint) + + def do_addoption(self, parser): + mname = "pytest_addoption" + methods = reversed(self.listattr(mname)) + MultiCall(methods, {'parser': parser}).execute() + + def do_configure(self, config): + assert not hasattr(self, '_config') + self._config = config + config.hook.pytest_configure(config=self._config) + + def do_unconfigure(self, config): + config = self._config + del self._config + config.hook.pytest_unconfigure(config=config) + config.pluginmanager.unregister(self) + + def notify_exception(self, excinfo): + excrepr = excinfo.getrepr(funcargs=True, showlocals=True) + res = self.hook.pytest_internalerror(excrepr=excrepr) + if not py.builtin.any(res): + for line in str(excrepr).split("\n"): + sys.stderr.write("INTERNALERROR> %s\n" %line) + sys.stderr.flush() + + def listattr(self, attrname, plugins=None): + if plugins is None: + plugins = self._plugins + l = [] + last = [] + for plugin in plugins: + try: + meth = getattr(plugin, attrname) + if hasattr(meth, 'tryfirst'): + last.append(meth) + elif hasattr(meth, 'trylast'): + l.insert(0, meth) + else: + l.append(meth) + except AttributeError: + continue + l.extend(last) + return l + + def call_plugin(self, plugin, methname, kwargs): + return MultiCall(methods=self.listattr(methname, plugins=[plugin]), + kwargs=kwargs, firstresult=True).execute() + + +def importplugin(importspec): + name = importspec + try: + mod = "_pytest." + name + return __import__(mod, None, None, '__doc__') + except ImportError: + #e = py.std.sys.exc_info()[1] + #if str(e).find(name) == -1: + # raise + pass # + return __import__(importspec, None, None, '__doc__') + +class MultiCall: + """ execute a call into multiple python functions/methods. """ + def __init__(self, methods, kwargs, firstresult=False): + self.methods = list(methods) + self.kwargs = kwargs + self.results = [] + self.firstresult = firstresult + + def __repr__(self): + status = "%d results, %d meths" % (len(self.results), len(self.methods)) + return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs) + + def execute(self): + while self.methods: + method = self.methods.pop() + kwargs = self.getkwargs(method) + res = method(**kwargs) + if res is not None: + self.results.append(res) + if self.firstresult: + return res + if not self.firstresult: + return self.results + + def getkwargs(self, method): + kwargs = {} + for argname in varnames(method): + try: + kwargs[argname] = self.kwargs[argname] + except KeyError: + if argname == "__multicall__": + kwargs[argname] = self + return kwargs + +def varnames(func): + if not inspect.isfunction(func) and not inspect.ismethod(func): + func = getattr(func, '__call__', func) + ismethod = inspect.ismethod(func) + rawcode = py.code.getrawcode(func) + try: + return rawcode.co_varnames[ismethod:rawcode.co_argcount] + except AttributeError: + return () + +class HookRelay: + def __init__(self, hookspecs, pm, prefix="pytest_"): + if not isinstance(hookspecs, list): + hookspecs = [hookspecs] + self._hookspecs = [] + self._pm = pm + self.trace = pm.trace.root.get("hook") + for hookspec in hookspecs: + self._addhooks(hookspec, prefix) + + def _addhooks(self, hookspecs, prefix): + self._hookspecs.append(hookspecs) + added = False + for name, method in vars(hookspecs).items(): + if name.startswith(prefix): + firstresult = getattr(method, 'firstresult', False) + hc = HookCaller(self, name, firstresult=firstresult) + setattr(self, name, hc) + added = True + #print ("setting new hook", name) + if not added: + raise ValueError("did not find new %r hooks in %r" %( + prefix, hookspecs,)) + + +class HookCaller: + def __init__(self, hookrelay, name, firstresult): + self.hookrelay = hookrelay + self.name = name + self.firstresult = firstresult + self.trace = self.hookrelay.trace + + def __repr__(self): + return "<HookCaller %r>" %(self.name,) + + def __call__(self, **kwargs): + methods = self.hookrelay._pm.listattr(self.name) + return self._docall(methods, kwargs) + + def pcall(self, plugins, **kwargs): + methods = self.hookrelay._pm.listattr(self.name, plugins=plugins) + return self._docall(methods, kwargs) + + def _docall(self, methods, kwargs): + self.trace(self.name, kwargs) + self.trace.root.indent += 1 + mc = MultiCall(methods, kwargs, firstresult=self.firstresult) + try: + res = mc.execute() + if res: + self.trace("finish", self.name, "-->", res) + finally: + self.trace.root.indent -= 1 + return res + +_preinit = [] + +def _preloadplugins(): + _preinit.append(PluginManager(load=True)) + +def main(args=None, plugins=None): + """ returned exit code integer, after an in-process testing run + with the given command line arguments, preloading an optional list + of passed in plugin objects. """ + if args is None: + args = sys.argv[1:] + elif isinstance(args, py.path.local): + args = [str(args)] + elif not isinstance(args, (tuple, list)): + if not isinstance(args, str): + raise ValueError("not a string or argument list: %r" % (args,)) + args = py.std.shlex.split(args) + if _preinit: + _pluginmanager = _preinit.pop(0) + else: # subsequent calls to main will create a fresh instance + _pluginmanager = PluginManager(load=True) + hook = _pluginmanager.hook + try: + if plugins: + for plugin in plugins: + _pluginmanager.register(plugin) + config = hook.pytest_cmdline_parse( + pluginmanager=_pluginmanager, args=args) + exitstatus = hook.pytest_cmdline_main(config=config) + except UsageError: + e = sys.exc_info()[1] + sys.stderr.write("ERROR: %s\n" %(e.args[0],)) + exitstatus = 3 + return exitstatus + +class UsageError(Exception): + """ error in py.test usage or invocation""" + diff --git a/_pytest/doctest.py b/_pytest/doctest.py new file mode 100644 index 0000000000..1378544ba5 --- /dev/null +++ b/_pytest/doctest.py @@ -0,0 +1,87 @@ +""" discover and run doctests in modules and test files.""" + +import pytest, py +from py._code.code import TerminalRepr, ReprFileLocation + +def pytest_addoption(parser): + group = parser.getgroup("collect") + group.addoption("--doctest-modules", + action="store_true", default=False, + help="run doctests in all .py modules", + dest="doctestmodules") + group.addoption("--doctest-glob", + action="store", default="test*.txt", metavar="pat", + help="doctests file matching pattern, default: test*.txt", + dest="doctestglob") + +def pytest_collect_file(path, parent): + config = parent.config + if path.ext == ".py": + if config.option.doctestmodules: + return DoctestModule(path, parent) + elif (path.ext in ('.txt', '.rst') and parent.session.isinitpath(path)) or \ + path.check(fnmatch=config.getvalue("doctestglob")): + return DoctestTextfile(path, parent) + +class ReprFailDoctest(TerminalRepr): + def __init__(self, reprlocation, lines): + self.reprlocation = reprlocation + self.lines = lines + def toterminal(self, tw): + for line in self.lines: + tw.line(line) + self.reprlocation.toterminal(tw) + +class DoctestItem(pytest.Item): + def repr_failure(self, excinfo): + doctest = py.std.doctest + if excinfo.errisinstance((doctest.DocTestFailure, + doctest.UnexpectedException)): + doctestfailure = excinfo.value + example = doctestfailure.example + test = doctestfailure.test + filename = test.filename + lineno = test.lineno + example.lineno + 1 + message = excinfo.type.__name__ + reprlocation = ReprFileLocation(filename, lineno, message) + checker = py.std.doctest.OutputChecker() + REPORT_UDIFF = py.std.doctest.REPORT_UDIFF + filelines = py.path.local(filename).readlines(cr=0) + i = max(test.lineno, max(0, lineno - 10)) # XXX? + lines = [] + for line in filelines[i:lineno]: + lines.append("%03d %s" % (i+1, line)) + i += 1 + if excinfo.errisinstance(doctest.DocTestFailure): + lines += checker.output_difference(example, + doctestfailure.got, REPORT_UDIFF).split("\n") + else: + inner_excinfo = py.code.ExceptionInfo(excinfo.value.exc_info) + lines += ["UNEXPECTED EXCEPTION: %s" % + repr(inner_excinfo.value)] + + return ReprFailDoctest(reprlocation, lines) + else: + return super(DoctestItem, self).repr_failure(excinfo) + + def reportinfo(self): + return self.fspath, None, "[doctest]" + +class DoctestTextfile(DoctestItem, pytest.File): + def runtest(self): + doctest = py.std.doctest + failed, tot = doctest.testfile( + str(self.fspath), module_relative=False, + optionflags=doctest.ELLIPSIS, + raise_on_error=True, verbose=0) + +class DoctestModule(DoctestItem, pytest.File): + def runtest(self): + doctest = py.std.doctest + if self.fspath.basename == "conftest.py": + module = self.config._conftest.importconftest(self.fspath) + else: + module = self.fspath.pyimport() + failed, tot = doctest.testmod( + module, raise_on_error=True, verbose=0, + optionflags=doctest.ELLIPSIS) diff --git a/_pytest/genscript.py b/_pytest/genscript.py new file mode 100755 index 0000000000..8cb9c2f3a3 --- /dev/null +++ b/_pytest/genscript.py @@ -0,0 +1,73 @@ +""" generate a single-file self-contained version of py.test """ +import py +import pickle +import zlib +import base64 + +def find_toplevel(name): + for syspath in py.std.sys.path: + base = py.path.local(syspath) + lib = base/name + if lib.check(dir=1): + return lib + mod = base.join("%s.py" % name) + if mod.check(file=1): + return mod + raise LookupError(name) + +def pkgname(toplevel, rootpath, path): + parts = path.parts()[len(rootpath.parts()):] + return '.'.join([toplevel] + [x.purebasename for x in parts]) + +def pkg_to_mapping(name): + toplevel = find_toplevel(name) + name2src = {} + if toplevel.check(file=1): # module + name2src[toplevel.purebasename] = toplevel.read() + else: # package + for pyfile in toplevel.visit('*.py'): + pkg = pkgname(name, toplevel, pyfile) + name2src[pkg] = pyfile.read() + return name2src + +def compress_mapping(mapping): + data = pickle.dumps(mapping, 2) + data = zlib.compress(data, 9) + data = base64.encodestring(data) + data = data.decode('ascii') + return data + + +def compress_packages(names): + mapping = {} + for name in names: + mapping.update(pkg_to_mapping(name)) + return compress_mapping(mapping) + + +def generate_script(entry, packages): + data = compress_packages(packages) + tmpl = py.path.local(__file__).dirpath().join('standalonetemplate.py') + exe = tmpl.read() + exe = exe.replace('@SOURCES@', data) + exe = exe.replace('@ENTRY@', entry) + return exe + + +def pytest_addoption(parser): + group = parser.getgroup("debugconfig") + group.addoption("--genscript", action="store", default=None, + dest="genscript", metavar="path", + help="create standalone py.test script at given target path.") + +def pytest_cmdline_main(config): + genscript = config.getvalue("genscript") + if genscript: + script = generate_script( + 'import py; raise SystemExit(py.test.cmdline.main())', + ['py', '_pytest', 'pytest'], + ) + + genscript = py.path.local(genscript) + genscript.write(script) + return 0 diff --git a/_pytest/helpconfig.py b/_pytest/helpconfig.py new file mode 100644 index 0000000000..0e37affcf5 --- /dev/null +++ b/_pytest/helpconfig.py @@ -0,0 +1,182 @@ +""" version info, help messages, tracing configuration. """ +import py +import pytest +import inspect, sys + +def pytest_addoption(parser): + group = parser.getgroup('debugconfig') + group.addoption('--version', action="store_true", + help="display pytest lib version and import information.") + group._addoption("-h", "--help", action="store_true", dest="help", + help="show help message and configuration info") + group._addoption('-p', action="append", dest="plugins", default = [], + metavar="name", + help="early-load given plugin (multi-allowed).") + group.addoption('--traceconfig', + action="store_true", dest="traceconfig", default=False, + help="trace considerations of conftest.py files."), + group._addoption('--nomagic', + action="store_true", dest="nomagic", default=False, + help="don't reinterpret asserts, no traceback cutting. ") + group.addoption('--debug', + action="store_true", dest="debug", default=False, + help="generate and show internal debugging information.") + + +def pytest_cmdline_main(config): + if config.option.version: + p = py.path.local(pytest.__file__) + sys.stderr.write("This is py.test version %s, imported from %s\n" % + (pytest.__version__, p)) + plugininfo = getpluginversioninfo(config) + if plugininfo: + for line in plugininfo: + sys.stderr.write(line + "\n") + return 0 + elif config.option.help: + config.pluginmanager.do_configure(config) + showhelp(config) + return 0 + +def showhelp(config): + tw = py.io.TerminalWriter() + tw.write(config._parser.optparser.format_help()) + tw.line() + tw.line() + #tw.sep( "=", "config file settings") + tw.line("[pytest] ini-options in the next " + "pytest.ini|tox.ini|setup.cfg file:") + tw.line() + + for name in config._parser._ininames: + help, type, default = config._parser._inidict[name] + if type is None: + type = "string" + spec = "%s (%s)" % (name, type) + line = " %-24s %s" %(spec, help) + tw.line(line[:tw.fullwidth]) + + tw.line() ; tw.line() + #tw.sep("=") + return + + tw.line("conftest.py options:") + tw.line() + conftestitems = sorted(config._parser._conftestdict.items()) + for name, help in conftest_options + conftestitems: + line = " %-15s %s" %(name, help) + tw.line(line[:tw.fullwidth]) + tw.line() + #tw.sep( "=") + +conftest_options = [ + ('pytest_plugins', 'list of plugin names to load'), +] + +def getpluginversioninfo(config): + lines = [] + plugininfo = config.pluginmanager._plugin_distinfo + if plugininfo: + lines.append("setuptools registered plugins:") + for dist, plugin in plugininfo: + loc = getattr(plugin, '__file__', repr(plugin)) + content = "%s-%s at %s" % (dist.project_name, dist.version, loc) + lines.append(" " + content) + return lines + +def pytest_report_header(config): + lines = [] + if config.option.debug or config.option.traceconfig: + lines.append("using: pytest-%s pylib-%s" % + (pytest.__version__,py.__version__)) + + verinfo = getpluginversioninfo(config) + if verinfo: + lines.extend(verinfo) + + if config.option.traceconfig: + lines.append("active plugins:") + plugins = [] + items = config.pluginmanager._name2plugin.items() + for name, plugin in items: + if hasattr(plugin, '__file__'): + r = plugin.__file__ + else: + r = repr(plugin) + lines.append(" %-20s: %s" %(name, r)) + return lines + + +# ===================================================== +# validate plugin syntax and hooks +# ===================================================== + +def pytest_plugin_registered(manager, plugin): + methods = collectattr(plugin) + hooks = {} + for hookspec in manager.hook._hookspecs: + hooks.update(collectattr(hookspec)) + + stringio = py.io.TextIO() + def Print(*args): + if args: + stringio.write(" ".join(map(str, args))) + stringio.write("\n") + + fail = False + while methods: + name, method = methods.popitem() + #print "checking", name + if isgenerichook(name): + continue + if name not in hooks: + if not getattr(method, 'optionalhook', False): + Print("found unknown hook:", name) + fail = True + else: + #print "checking", method + method_args = getargs(method) + #print "method_args", method_args + if '__multicall__' in method_args: + method_args.remove('__multicall__') + hook = hooks[name] + hookargs = getargs(hook) + for arg in method_args: + if arg not in hookargs: + Print("argument %r not available" %(arg, )) + Print("actual definition: %s" %(formatdef(method))) + Print("available hook arguments: %s" % + ", ".join(hookargs)) + fail = True + break + #if not fail: + # print "matching hook:", formatdef(method) + if fail: + name = getattr(plugin, '__name__', plugin) + raise PluginValidationError("%s:\n%s" % (name, stringio.getvalue())) + +class PluginValidationError(Exception): + """ plugin failed validation. """ + +def isgenerichook(name): + return name == "pytest_plugins" or \ + name.startswith("pytest_funcarg__") + +def getargs(func): + args = inspect.getargs(py.code.getrawcode(func))[0] + startindex = inspect.ismethod(func) and 1 or 0 + return args[startindex:] + +def collectattr(obj): + methods = {} + for apiname in dir(obj): + if apiname.startswith("pytest_"): + methods[apiname] = getattr(obj, apiname) + return methods + +def formatdef(func): + return "%s%s" % ( + func.__name__, + inspect.formatargspec(*inspect.getargspec(func)) + ) + diff --git a/_pytest/hookspec.py b/_pytest/hookspec.py new file mode 100644 index 0000000000..580ab27997 --- /dev/null +++ b/_pytest/hookspec.py @@ -0,0 +1,222 @@ +""" hook specifications for pytest plugins, invoked from main.py and builtin plugins. """ + +# ------------------------------------------------------------------------- +# Initialization +# ------------------------------------------------------------------------- + +def pytest_addhooks(pluginmanager): + """called at plugin load time to allow adding new hooks via a call to + pluginmanager.registerhooks(module).""" + + +def pytest_namespace(): + """return dict of name->object to be made globally available in + the py.test/pytest namespace. This hook is called before command + line options are parsed. + """ + +def pytest_cmdline_parse(pluginmanager, args): + """return initialized config object, parsing the specified args. """ +pytest_cmdline_parse.firstresult = True + +def pytest_cmdline_preparse(config, args): + """modify command line arguments before option parsing. """ + +def pytest_addoption(parser): + """add optparse-style options and ini-style config values via calls + to ``parser.addoption`` and ``parser.addini(...)``. + """ + +def pytest_cmdline_main(config): + """ called for performing the main command line action. The default + implementation will invoke the configure hooks and runtest_mainloop. """ +pytest_cmdline_main.firstresult = True + +def pytest_configure(config): + """ called after command line options have been parsed. + and all plugins and initial conftest files been loaded. + """ + +def pytest_unconfigure(config): + """ called before test process is exited. """ + +def pytest_runtestloop(session): + """ called for performing the main runtest loop + (after collection finished). """ +pytest_runtestloop.firstresult = True + +# ------------------------------------------------------------------------- +# collection hooks +# ------------------------------------------------------------------------- + +def pytest_collection(session): + """ perform the collection protocol for the given session. """ +pytest_collection.firstresult = True + +def pytest_collection_modifyitems(session, config, items): + """ called after collection has been performed, may filter or re-order + the items in-place.""" + +def pytest_collection_finish(session): + """ called after collection has been performed and modified. """ + +def pytest_ignore_collect(path, config): + """ return True to prevent considering this path for collection. + This hook is consulted for all files and directories prior to calling + more specific hooks. + """ +pytest_ignore_collect.firstresult = True + +def pytest_collect_directory(path, parent): + """ called before traversing a directory for collection files. """ +pytest_collect_directory.firstresult = True + +def pytest_collect_file(path, parent): + """ return collection Node or None for the given path. Any new node + needs to have the specified ``parent`` as a parent.""" + +# logging hooks for collection +def pytest_collectstart(collector): + """ collector starts collecting. """ + +def pytest_itemcollected(item): + """ we just collected a test item. """ + +def pytest_collectreport(report): + """ collector finished collecting. """ + +def pytest_deselected(items): + """ called for test items deselected by keyword. """ + +def pytest_make_collect_report(collector): + """ perform ``collector.collect()`` and return a CollectReport. """ +pytest_make_collect_report.firstresult = True + +# ------------------------------------------------------------------------- +# Python test function related hooks +# ------------------------------------------------------------------------- + +def pytest_pycollect_makemodule(path, parent): + """ return a Module collector or None for the given path. + This hook will be called for each matching test module path. + The pytest_collect_file hook needs to be used if you want to + create test modules for files that do not match as a test module. + """ +pytest_pycollect_makemodule.firstresult = True + +def pytest_pycollect_makeitem(collector, name, obj): + """ return custom item/collector for a python object in a module, or None. """ +pytest_pycollect_makeitem.firstresult = True + +def pytest_pyfunc_call(pyfuncitem): + """ call underlying test function. """ +pytest_pyfunc_call.firstresult = True + +def pytest_generate_tests(metafunc): + """ generate (multiple) parametrized calls to a test function.""" + +# ------------------------------------------------------------------------- +# generic runtest related hooks +# ------------------------------------------------------------------------- +def pytest_itemstart(item, node=None): + """ (deprecated, use pytest_runtest_logstart). """ + +def pytest_runtest_protocol(item): + """ implements the standard runtest_setup/call/teardown protocol including + capturing exceptions and calling reporting hooks on the results accordingly. + + :return boolean: True if no further hook implementations should be invoked. + """ +pytest_runtest_protocol.firstresult = True + +def pytest_runtest_logstart(nodeid, location): + """ signal the start of a test run. """ + +def pytest_runtest_setup(item): + """ called before ``pytest_runtest_call(item)``. """ + +def pytest_runtest_call(item): + """ called to execute the test ``item``. """ + +def pytest_runtest_teardown(item): + """ called after ``pytest_runtest_call``. """ + +def pytest_runtest_makereport(item, call): + """ return a :py:class:`_pytest.runner.TestReport` object + for the given :py:class:`pytest.Item` and + :py:class:`_pytest.runner.CallInfo`. + """ +pytest_runtest_makereport.firstresult = True + +def pytest_runtest_logreport(report): + """ process item test report. """ + +# special handling for final teardown - somewhat internal for now +def pytest__teardown_final(session): + """ called before test session finishes. """ +pytest__teardown_final.firstresult = True + +def pytest__teardown_final_logerror(report, session): + """ called if runtest_teardown_final failed. """ + +# ------------------------------------------------------------------------- +# test session related hooks +# ------------------------------------------------------------------------- + +def pytest_sessionstart(session): + """ before session.main() is called. """ + +def pytest_sessionfinish(session, exitstatus): + """ whole test run finishes. """ + + +# ------------------------------------------------------------------------- +# hooks for customising the assert methods +# ------------------------------------------------------------------------- + +def pytest_assertrepr_compare(config, op, left, right): + """return explanation for comparisons in failing assert expressions. + + Return None for no custom explanation, otherwise return a list + of strings. The strings will be joined by newlines but any newlines + *in* a string will be escaped. Note that all but the first line will + be indented sligthly, the intention is for the first line to be a summary. + """ + +# ------------------------------------------------------------------------- +# hooks for influencing reporting (invoked from _pytest_terminal) +# ------------------------------------------------------------------------- + +def pytest_report_header(config): + """ return a string to be displayed as header info for terminal reporting.""" + +def pytest_report_teststatus(report): + """ return result-category, shortletter and verbose word for reporting.""" +pytest_report_teststatus.firstresult = True + +def pytest_terminal_summary(terminalreporter): + """ add additional section in terminal summary reporting. """ + +# ------------------------------------------------------------------------- +# doctest hooks +# ------------------------------------------------------------------------- + +def pytest_doctest_prepare_content(content): + """ return processed content for a given doctest""" +pytest_doctest_prepare_content.firstresult = True + +# ------------------------------------------------------------------------- +# error handling and internal debugging hooks +# ------------------------------------------------------------------------- + +def pytest_plugin_registered(plugin, manager): + """ a new py lib plugin got registered. """ + +def pytest_plugin_unregistered(plugin): + """ a py lib plugin got unregistered. """ + +def pytest_internalerror(excrepr): + """ called for internal errors. """ + +def pytest_keyboard_interrupt(excinfo): + """ called for keyboard interrupt. """ diff --git a/_pytest/junitxml.py b/_pytest/junitxml.py new file mode 100644 index 0000000000..db6c5a8b26 --- /dev/null +++ b/_pytest/junitxml.py @@ -0,0 +1,173 @@ +""" report test results in JUnit-XML format, for use with Hudson and build integration servers. + +Based on initial code from Ross Lawley. +""" + +import py +import os +import time + +def pytest_addoption(parser): + group = parser.getgroup("terminal reporting") + group.addoption('--junitxml', action="store", dest="xmlpath", + metavar="path", default=None, + help="create junit-xml style report file at given path.") + group.addoption('--junitprefix', action="store", dest="junitprefix", + metavar="str", default=None, + help="prepend prefix to classnames in junit-xml output") + +def pytest_configure(config): + xmlpath = config.option.xmlpath + if xmlpath: + config._xml = LogXML(xmlpath, config.option.junitprefix) + config.pluginmanager.register(config._xml) + +def pytest_unconfigure(config): + xml = getattr(config, '_xml', None) + if xml: + del config._xml + config.pluginmanager.unregister(xml) + +class LogXML(object): + def __init__(self, logfile, prefix): + self.logfile = logfile + self.prefix = prefix + self.test_logs = [] + self.passed = self.skipped = 0 + self.failed = self.errors = 0 + self._durations = {} + + def _opentestcase(self, report): + names = report.nodeid.split("::") + names[0] = names[0].replace("/", '.') + names = tuple(names) + d = {'time': self._durations.pop(names, "0")} + names = [x.replace(".py", "") for x in names if x != "()"] + classnames = names[:-1] + if self.prefix: + classnames.insert(0, self.prefix) + d['classname'] = ".".join(classnames) + d['name'] = py.xml.escape(names[-1]) + attrs = ['%s="%s"' % item for item in sorted(d.items())] + self.test_logs.append("\n<testcase %s>" % " ".join(attrs)) + + def _closetestcase(self): + self.test_logs.append("</testcase>") + + def appendlog(self, fmt, *args): + args = tuple([py.xml.escape(arg) for arg in args]) + self.test_logs.append(fmt % args) + + def append_pass(self, report): + self.passed += 1 + self._opentestcase(report) + self._closetestcase() + + def append_failure(self, report): + self._opentestcase(report) + #msg = str(report.longrepr.reprtraceback.extraline) + if "xfail" in report.keywords: + self.appendlog( + '<skipped message="xfail-marked test passes unexpectedly"/>') + self.skipped += 1 + else: + self.appendlog('<failure message="test failure">%s</failure>', + report.longrepr) + self.failed += 1 + self._closetestcase() + + def append_collect_failure(self, report): + self._opentestcase(report) + #msg = str(report.longrepr.reprtraceback.extraline) + self.appendlog('<failure message="collection failure">%s</failure>', + report.longrepr) + self._closetestcase() + self.errors += 1 + + def append_collect_skipped(self, report): + self._opentestcase(report) + #msg = str(report.longrepr.reprtraceback.extraline) + self.appendlog('<skipped message="collection skipped">%s</skipped>', + report.longrepr) + self._closetestcase() + self.skipped += 1 + + def append_error(self, report): + self._opentestcase(report) + self.appendlog('<error message="test setup failure">%s</error>', + report.longrepr) + self._closetestcase() + self.errors += 1 + + def append_skipped(self, report): + self._opentestcase(report) + if "xfail" in report.keywords: + self.appendlog( + '<skipped message="expected test failure">%s</skipped>', + report.keywords['xfail']) + else: + self.appendlog("<skipped/>") + self._closetestcase() + self.skipped += 1 + + def pytest_runtest_logreport(self, report): + if report.passed: + self.append_pass(report) + elif report.failed: + if report.when != "call": + self.append_error(report) + else: + self.append_failure(report) + elif report.skipped: + self.append_skipped(report) + + def pytest_runtest_call(self, item, __multicall__): + names = tuple(item.listnames()) + start = time.time() + try: + return __multicall__.execute() + finally: + self._durations[names] = time.time() - start + + def pytest_collectreport(self, report): + if not report.passed: + if report.failed: + self.append_collect_failure(report) + else: + self.append_collect_skipped(report) + + def pytest_internalerror(self, excrepr): + self.errors += 1 + data = py.xml.escape(excrepr) + self.test_logs.append( + '\n<testcase classname="pytest" name="internal">' + ' <error message="internal error">' + '%s</error></testcase>' % data) + + def pytest_sessionstart(self, session): + self.suite_start_time = time.time() + + def pytest_sessionfinish(self, session, exitstatus, __multicall__): + if py.std.sys.version_info[0] < 3: + logfile = py.std.codecs.open(self.logfile, 'w', encoding='utf-8') + else: + logfile = open(self.logfile, 'w', encoding='utf-8') + + suite_stop_time = time.time() + suite_time_delta = suite_stop_time - self.suite_start_time + numtests = self.passed + self.failed + logfile.write('<?xml version="1.0" encoding="utf-8"?>') + logfile.write('<testsuite ') + logfile.write('name="" ') + logfile.write('errors="%i" ' % self.errors) + logfile.write('failures="%i" ' % self.failed) + logfile.write('skips="%i" ' % self.skipped) + logfile.write('tests="%i" ' % numtests) + logfile.write('time="%.3f"' % suite_time_delta) + logfile.write(' >') + logfile.writelines(self.test_logs) + logfile.write('</testsuite>') + logfile.close() + + def pytest_terminal_summary(self, terminalreporter): + terminalreporter.write_sep("-", "generated xml file: %s" % (self.logfile)) diff --git a/_pytest/main.py b/_pytest/main.py new file mode 100644 index 0000000000..f1be30601c --- /dev/null +++ b/_pytest/main.py @@ -0,0 +1,517 @@ +""" core implementation of testing process: init, session, runtest loop. """ + +import py +import pytest, _pytest +import os, sys +tracebackcutdir = py.path.local(_pytest.__file__).dirpath() + +# exitcodes for the command line +EXIT_OK = 0 +EXIT_TESTSFAILED = 1 +EXIT_INTERRUPTED = 2 +EXIT_INTERNALERROR = 3 + +def pytest_addoption(parser): + parser.addini("norecursedirs", "directory patterns to avoid for recursion", + type="args", default=('.*', 'CVS', '_darcs', '{arch}')) + #parser.addini("dirpatterns", + # "patterns specifying possible locations of test files", + # type="linelist", default=["**/test_*.txt", + # "**/test_*.py", "**/*_test.py"] + #) + group = parser.getgroup("general", "running and selection options") + group._addoption('-x', '--exitfirst', action="store_true", default=False, + dest="exitfirst", + help="exit instantly on first error or failed test."), + group._addoption('--maxfail', metavar="num", + action="store", type="int", dest="maxfail", default=0, + help="exit after first num failures or errors.") + + group = parser.getgroup("collect", "collection") + group.addoption('--collectonly', + action="store_true", dest="collectonly", + help="only collect tests, don't execute them."), + group.addoption('--pyargs', action="store_true", + help="try to interpret all arguments as python packages.") + group.addoption("--ignore", action="append", metavar="path", + help="ignore path during collection (multi-allowed).") + group.addoption('--confcutdir', dest="confcutdir", default=None, + metavar="dir", + help="only load conftest.py's relative to specified dir.") + + group = parser.getgroup("debugconfig", + "test session debugging and configuration") + group.addoption('--basetemp', dest="basetemp", default=None, metavar="dir", + help="base temporary directory for this test run.") + + +def pytest_namespace(): + return dict(collect=dict(Item=Item, Collector=Collector, File=File)) + +def pytest_configure(config): + py.test.config = config # compatibiltiy + if config.option.exitfirst: + config.option.maxfail = 1 + +def pytest_cmdline_main(config): + """ default command line protocol for initialization, session, + running tests and reporting. """ + session = Session(config) + session.exitstatus = EXIT_OK + try: + config.pluginmanager.do_configure(config) + config.hook.pytest_sessionstart(session=session) + config.hook.pytest_collection(session=session) + config.hook.pytest_runtestloop(session=session) + except pytest.UsageError: + raise + except KeyboardInterrupt: + excinfo = py.code.ExceptionInfo() + config.hook.pytest_keyboard_interrupt(excinfo=excinfo) + session.exitstatus = EXIT_INTERRUPTED + except: + excinfo = py.code.ExceptionInfo() + config.pluginmanager.notify_exception(excinfo) + session.exitstatus = EXIT_INTERNALERROR + if excinfo.errisinstance(SystemExit): + sys.stderr.write("mainloop: caught Spurious SystemExit!\n") + if not session.exitstatus and session._testsfailed: + session.exitstatus = EXIT_TESTSFAILED + config.hook.pytest_sessionfinish(session=session, + exitstatus=session.exitstatus) + config.pluginmanager.do_unconfigure(config) + return session.exitstatus + +def pytest_collection(session): + session.perform_collect() + hook = session.config.hook + hook.pytest_collection_modifyitems(session=session, + config=session.config, items=session.items) + hook.pytest_collection_finish(session=session) + return True + +def pytest_runtestloop(session): + if session.config.option.collectonly: + return True + for item in session.session.items: + item.config.hook.pytest_runtest_protocol(item=item) + if session.shouldstop: + raise session.Interrupted(session.shouldstop) + return True + +def pytest_ignore_collect(path, config): + p = path.dirpath() + ignore_paths = config._getconftest_pathlist("collect_ignore", path=p) + ignore_paths = ignore_paths or [] + excludeopt = config.getvalue("ignore") + if excludeopt: + ignore_paths.extend([py.path.local(x) for x in excludeopt]) + return path in ignore_paths + +class HookProxy: + def __init__(self, fspath, config): + self.fspath = fspath + self.config = config + def __getattr__(self, name): + hookmethod = getattr(self.config.hook, name) + def call_matching_hooks(**kwargs): + plugins = self.config._getmatchingplugins(self.fspath) + return hookmethod.pcall(plugins, **kwargs) + return call_matching_hooks + +def compatproperty(name): + def fget(self): + #print "retrieving %r property from %s" %(name, self.fspath) + py.log._apiwarn("2.0", "use pytest.%s for " + "test collection and item classes" % name) + return getattr(pytest, name) + return property(fget, None, None, + "deprecated attribute %r, use pytest.%s" % (name,name)) + +class Node(object): + """ base class for all Nodes in the collection tree. + Collector subclasses have children, Items are terminal nodes.""" + + def __init__(self, name, parent=None, config=None, session=None): + #: a unique name with the scope of the parent + self.name = name + + #: the parent collector node. + self.parent = parent + + #: the test config object + self.config = config or parent.config + + #: the collection this node is part of + self.session = session or parent.session + + #: filesystem path where this node was collected from + self.fspath = getattr(parent, 'fspath', None) + self.ihook = self.session.gethookproxy(self.fspath) + self.keywords = {self.name: True} + + Module = compatproperty("Module") + Class = compatproperty("Class") + Instance = compatproperty("Instance") + Function = compatproperty("Function") + File = compatproperty("File") + Item = compatproperty("Item") + + def __repr__(self): + return "<%s %r>" %(self.__class__.__name__, getattr(self, 'name', None)) + + # methods for ordering nodes + @property + def nodeid(self): + try: + return self._nodeid + except AttributeError: + self._nodeid = x = self._makeid() + return x + + def _makeid(self): + return self.parent.nodeid + "::" + self.name + + def __eq__(self, other): + if not isinstance(other, Node): + return False + return self.__class__ == other.__class__ and \ + self.name == other.name and self.parent == other.parent + + def __ne__(self, other): + return not self == other + + def __hash__(self): + return hash((self.name, self.parent)) + + def setup(self): + pass + + def teardown(self): + pass + + def _memoizedcall(self, attrname, function): + exattrname = "_ex_" + attrname + failure = getattr(self, exattrname, None) + if failure is not None: + py.builtin._reraise(failure[0], failure[1], failure[2]) + if hasattr(self, attrname): + return getattr(self, attrname) + try: + res = function() + except py.builtin._sysex: + raise + except: + failure = py.std.sys.exc_info() + setattr(self, exattrname, failure) + raise + setattr(self, attrname, res) + return res + + def listchain(self): + """ return list of all parent collectors up to self, + starting from root of collection tree. """ + l = [self] + while 1: + x = l[0] + if x.parent is not None: # and x.parent.parent is not None: + l.insert(0, x.parent) + else: + return l + + def listnames(self): + return [x.name for x in self.listchain()] + + def getplugins(self): + return self.config._getmatchingplugins(self.fspath) + + def getparent(self, cls): + current = self + while current and not isinstance(current, cls): + current = current.parent + return current + + def _prunetraceback(self, excinfo): + pass + + def _repr_failure_py(self, excinfo, style=None): + if self.config.option.fulltrace: + style="long" + else: + self._prunetraceback(excinfo) + # XXX should excinfo.getrepr record all data and toterminal() + # process it? + if style is None: + if self.config.option.tbstyle == "short": + style = "short" + else: + style = "long" + return excinfo.getrepr(funcargs=True, + showlocals=self.config.option.showlocals, + style=style) + + repr_failure = _repr_failure_py + +class Collector(Node): + """ Collector instances create children through collect() + and thus iteratively build a tree. + """ + class CollectError(Exception): + """ an error during collection, contains a custom message. """ + + def collect(self): + """ returns a list of children (items and collectors) + for this collection node. + """ + raise NotImplementedError("abstract") + + def repr_failure(self, excinfo): + """ represent a collection failure. """ + if excinfo.errisinstance(self.CollectError): + exc = excinfo.value + return str(exc.args[0]) + return self._repr_failure_py(excinfo, style="short") + + def _memocollect(self): + """ internal helper method to cache results of calling collect(). """ + return self._memoizedcall('_collected', lambda: list(self.collect())) + + def _prunetraceback(self, excinfo): + if hasattr(self, 'fspath'): + path = self.fspath + traceback = excinfo.traceback + ntraceback = traceback.cut(path=self.fspath) + if ntraceback == traceback: + ntraceback = ntraceback.cut(excludepath=tracebackcutdir) + excinfo.traceback = ntraceback.filter() + +class FSCollector(Collector): + def __init__(self, fspath, parent=None, config=None, session=None): + fspath = py.path.local(fspath) # xxx only for test_resultlog.py? + name = fspath.basename + if parent is not None: + rel = fspath.relto(parent.fspath) + if rel: + name = rel + name = name.replace(os.sep, "/") + super(FSCollector, self).__init__(name, parent, config, session) + self.fspath = fspath + + def _makeid(self): + if self == self.session: + return "." + relpath = self.session.fspath.bestrelpath(self.fspath) + if os.sep != "/": + relpath = relpath.replace(os.sep, "/") + return relpath + +class File(FSCollector): + """ base class for collecting tests from a file. """ + +class Item(Node): + """ a basic test invocation item. Note that for a single function + there might be multiple test invocation items. + """ + def reportinfo(self): + return self.fspath, None, "" + + @property + def location(self): + try: + return self._location + except AttributeError: + location = self.reportinfo() + fspath = self.session.fspath.bestrelpath(location[0]) + location = (fspath, location[1], str(location[2])) + self._location = location + return location + +class NoMatch(Exception): + """ raised if matching cannot locate a matching names. """ + +class Session(FSCollector): + class Interrupted(KeyboardInterrupt): + """ signals an interrupted test run. """ + __module__ = 'builtins' # for py3 + + def __init__(self, config): + super(Session, self).__init__(py.path.local(), parent=None, + config=config, session=self) + assert self.config.pluginmanager.register(self, name="session", prepend=True) + self._testsfailed = 0 + self.shouldstop = False + self.trace = config.trace.root.get("collection") + self._norecursepatterns = config.getini("norecursedirs") + + def pytest_collectstart(self): + if self.shouldstop: + raise self.Interrupted(self.shouldstop) + + def pytest_runtest_logreport(self, report): + if report.failed and 'xfail' not in getattr(report, 'keywords', []): + self._testsfailed += 1 + maxfail = self.config.getvalue("maxfail") + if maxfail and self._testsfailed >= maxfail: + self.shouldstop = "stopping after %d failures" % ( + self._testsfailed) + pytest_collectreport = pytest_runtest_logreport + + def isinitpath(self, path): + return path in self._initialpaths + + def gethookproxy(self, fspath): + return HookProxy(fspath, self.config) + + def perform_collect(self, args=None, genitems=True): + if args is None: + args = self.config.args + self.trace("perform_collect", self, args) + self.trace.root.indent += 1 + self._notfound = [] + self._initialpaths = set() + self._initialparts = [] + for arg in args: + parts = self._parsearg(arg) + self._initialparts.append(parts) + self._initialpaths.add(parts[0]) + self.ihook.pytest_collectstart(collector=self) + rep = self.ihook.pytest_make_collect_report(collector=self) + self.ihook.pytest_collectreport(report=rep) + self.trace.root.indent -= 1 + if self._notfound: + for arg, exc in self._notfound: + line = "(no name %r in any of %r)" % (arg, exc.args[0]) + raise pytest.UsageError("not found: %s\n%s" %(arg, line)) + if not genitems: + return rep.result + else: + self.items = items = [] + if rep.passed: + for node in rep.result: + self.items.extend(self.genitems(node)) + return items + + def collect(self): + for parts in self._initialparts: + arg = "::".join(map(str, parts)) + self.trace("processing argument", arg) + self.trace.root.indent += 1 + try: + for x in self._collect(arg): + yield x + except NoMatch: + # we are inside a make_report hook so + # we cannot directly pass through the exception + self._notfound.append((arg, sys.exc_info()[1])) + self.trace.root.indent -= 1 + break + self.trace.root.indent -= 1 + + def _collect(self, arg): + names = self._parsearg(arg) + path = names.pop(0) + if path.check(dir=1): + assert not names, "invalid arg %r" %(arg,) + for path in path.visit(fil=lambda x: x.check(file=1), + rec=self._recurse, bf=True, sort=True): + for x in self._collectfile(path): + yield x + else: + assert path.check(file=1) + for x in self.matchnodes(self._collectfile(path), names): + yield x + + def _collectfile(self, path): + ihook = self.gethookproxy(path) + if not self.isinitpath(path): + if ihook.pytest_ignore_collect(path=path, config=self.config): + return () + return ihook.pytest_collect_file(path=path, parent=self) + + def _recurse(self, path): + ihook = self.gethookproxy(path.dirpath()) + if ihook.pytest_ignore_collect(path=path, config=self.config): + return + for pat in self._norecursepatterns: + if path.check(fnmatch=pat): + return False + ihook = self.gethookproxy(path) + ihook.pytest_collect_directory(path=path, parent=self) + return True + + def _tryconvertpyarg(self, x): + try: + mod = __import__(x, None, None, ['__doc__']) + except (ValueError, ImportError): + return x + p = py.path.local(mod.__file__) + if p.purebasename == "__init__": + p = p.dirpath() + else: + p = p.new(basename=p.purebasename+".py") + return p + + def _parsearg(self, arg): + """ return (fspath, names) tuple after checking the file exists. """ + arg = str(arg) + if self.config.option.pyargs: + arg = self._tryconvertpyarg(arg) + parts = str(arg).split("::") + relpath = parts[0].replace("/", os.sep) + path = self.fspath.join(relpath, abs=True) + if not path.check(): + if self.config.option.pyargs: + msg = "file or package not found: " + else: + msg = "file not found: " + raise pytest.UsageError(msg + arg) + parts[0] = path + return parts + + def matchnodes(self, matching, names): + self.trace("matchnodes", matching, names) + self.trace.root.indent += 1 + nodes = self._matchnodes(matching, names) + num = len(nodes) + self.trace("matchnodes finished -> ", num, "nodes") + self.trace.root.indent -= 1 + if num == 0: + raise NoMatch(matching, names[:1]) + return nodes + + def _matchnodes(self, matching, names): + if not matching or not names: + return matching + name = names[0] + assert name + nextnames = names[1:] + resultnodes = [] + for node in matching: + if isinstance(node, pytest.Item): + if not names: + resultnodes.append(node) + continue + assert isinstance(node, pytest.Collector) + node.ihook.pytest_collectstart(collector=node) + rep = node.ihook.pytest_make_collect_report(collector=node) + if rep.passed: + for x in rep.result: + if x.name == name: + resultnodes.extend(self.matchnodes([x], nextnames)) + node.ihook.pytest_collectreport(report=rep) + return resultnodes + + def genitems(self, node): + self.trace("genitems", node) + if isinstance(node, pytest.Item): + node.ihook.pytest_itemcollected(item=node) + yield node + else: + assert isinstance(node, pytest.Collector) + node.ihook.pytest_collectstart(collector=node) + rep = node.ihook.pytest_make_collect_report(collector=node) + if rep.passed: + for subnode in rep.result: + for x in self.genitems(subnode): + yield x + node.ihook.pytest_collectreport(report=rep) diff --git a/_pytest/mark.py b/_pytest/mark.py new file mode 100644 index 0000000000..5383e3a9d5 --- /dev/null +++ b/_pytest/mark.py @@ -0,0 +1,176 @@ +""" generic mechanism for marking and selecting python functions. """ +import pytest, py + +def pytest_namespace(): + return {'mark': MarkGenerator()} + +def pytest_addoption(parser): + group = parser.getgroup("general") + group._addoption('-k', + action="store", dest="keyword", default='', metavar="KEYWORDEXPR", + help="only run tests which match given keyword expression. " + "An expression consists of space-separated terms. " + "Each term must match. Precede a term with '-' to negate. " + "Terminate expression with ':' to make the first match match " + "all subsequent tests (usually file-order). ") + +def pytest_collection_modifyitems(items, config): + keywordexpr = config.option.keyword + if not keywordexpr: + return + selectuntil = False + if keywordexpr[-1] == ":": + selectuntil = True + keywordexpr = keywordexpr[:-1] + + remaining = [] + deselected = [] + for colitem in items: + if keywordexpr and skipbykeyword(colitem, keywordexpr): + deselected.append(colitem) + else: + remaining.append(colitem) + if selectuntil: + keywordexpr = None + + if deselected: + config.hook.pytest_deselected(items=deselected) + items[:] = remaining + +def skipbykeyword(colitem, keywordexpr): + """ return True if they given keyword expression means to + skip this collector/item. + """ + if not keywordexpr: + return + + itemkeywords = getkeywords(colitem) + for key in filter(None, keywordexpr.split()): + eor = key[:1] == '-' + if eor: + key = key[1:] + if not (eor ^ matchonekeyword(key, itemkeywords)): + return True + +def getkeywords(node): + keywords = {} + while node is not None: + keywords.update(node.keywords) + node = node.parent + return keywords + + +def matchonekeyword(key, itemkeywords): + for elem in key.split("."): + for kw in itemkeywords: + if elem in kw: + break + else: + return False + return True + +class MarkGenerator: + """ Factory for :class:`MarkDecorator` objects - exposed as + a ``py.test.mark`` singleton instance. Example:: + + import py + @py.test.mark.slowtest + def test_function(): + pass + + will set a 'slowtest' :class:`MarkInfo` object + on the ``test_function`` object. """ + + def __getattr__(self, name): + if name[0] == "_": + raise AttributeError(name) + return MarkDecorator(name) + +class MarkDecorator: + """ A decorator for test functions and test classes. When applied + it will create :class:`MarkInfo` objects which may be + :ref:`retrieved by hooks as item keywords` MarkDecorator instances + are usually created by writing:: + + mark1 = py.test.mark.NAME # simple MarkDecorator + mark2 = py.test.mark.NAME(name1=value) # parametrized MarkDecorator + + and can then be applied as decorators to test functions:: + + @mark2 + def test_function(): + pass + """ + def __init__(self, name, args=None, kwargs=None): + self.markname = name + self.args = args or () + self.kwargs = kwargs or {} + + def __repr__(self): + d = self.__dict__.copy() + name = d.pop('markname') + return "<MarkDecorator %r %r>" %(name, d) + + def __call__(self, *args, **kwargs): + """ if passed a single callable argument: decorate it with mark info. + otherwise add *args/**kwargs in-place to mark information. """ + if args: + func = args[0] + if len(args) == 1 and hasattr(func, '__call__') or \ + hasattr(func, '__bases__'): + if hasattr(func, '__bases__'): + if hasattr(func, 'pytestmark'): + l = func.pytestmark + if not isinstance(l, list): + func.pytestmark = [l, self] + else: + l.append(self) + else: + func.pytestmark = [self] + else: + holder = getattr(func, self.markname, None) + if holder is None: + holder = MarkInfo(self.markname, self.args, self.kwargs) + setattr(func, self.markname, holder) + else: + holder.kwargs.update(self.kwargs) + holder.args += self.args + return func + kw = self.kwargs.copy() + kw.update(kwargs) + args = self.args + args + return self.__class__(self.markname, args=args, kwargs=kw) + +class MarkInfo: + """ Marking object created by :class:`MarkDecorator` instances. """ + def __init__(self, name, args, kwargs): + #: name of attribute + self.name = name + #: positional argument list, empty if none specified + self.args = args + #: keyword argument dictionary, empty if nothing specified + self.kwargs = kwargs + + def __repr__(self): + return "<MarkInfo %r args=%r kwargs=%r>" % ( + self._name, self.args, self.kwargs) + +def pytest_itemcollected(item): + if not isinstance(item, pytest.Function): + return + try: + func = item.obj.__func__ + except AttributeError: + func = getattr(item.obj, 'im_func', item.obj) + pyclasses = (pytest.Class, pytest.Module) + for node in item.listchain(): + if isinstance(node, pyclasses): + marker = getattr(node.obj, 'pytestmark', None) + if marker is not None: + if isinstance(marker, list): + for mark in marker: + mark(func) + else: + marker(func) + node = node.parent + item.keywords.update(py.builtin._getfuncdict(func)) diff --git a/_pytest/monkeypatch.py b/_pytest/monkeypatch.py new file mode 100644 index 0000000000..1ab0b382f6 --- /dev/null +++ b/_pytest/monkeypatch.py @@ -0,0 +1,103 @@ +""" monkeypatching and mocking functionality. """ + +import os, sys + +def pytest_funcarg__monkeypatch(request): + """The returned ``monkeypatch`` funcarg provides these + helper methods to modify objects, dictionaries or os.environ:: + + monkeypatch.setattr(obj, name, value, raising=True) + monkeypatch.delattr(obj, name, raising=True) + monkeypatch.setitem(mapping, name, value) + monkeypatch.delitem(obj, name, raising=True) + monkeypatch.setenv(name, value, prepend=False) + monkeypatch.delenv(name, value, raising=True) + monkeypatch.syspath_prepend(path) + + All modifications will be undone when the requesting + test function finished its execution. The ``raising`` + parameter determines if a KeyError or AttributeError + will be raised if the set/deletion operation has no target. + """ + mpatch = monkeypatch() + request.addfinalizer(mpatch.undo) + return mpatch + +notset = object() + +class monkeypatch: + """ object keeping a record of setattr/item/env/syspath changes. """ + def __init__(self): + self._setattr = [] + self._setitem = [] + + def setattr(self, obj, name, value, raising=True): + """ set attribute ``name`` on ``obj`` to ``value``, by default + raise AttributeEror if the attribute did not exist. """ + oldval = getattr(obj, name, notset) + if raising and oldval is notset: + raise AttributeError("%r has no attribute %r" %(obj, name)) + self._setattr.insert(0, (obj, name, oldval)) + setattr(obj, name, value) + + def delattr(self, obj, name, raising=True): + """ delete attribute ``name`` from ``obj``, by default raise + AttributeError it the attribute did not previously exist. """ + if not hasattr(obj, name): + if raising: + raise AttributeError(name) + else: + self._setattr.insert(0, (obj, name, getattr(obj, name, notset))) + delattr(obj, name) + + def setitem(self, dic, name, value): + """ set dictionary entry ``name`` to value. """ + self._setitem.insert(0, (dic, name, dic.get(name, notset))) + dic[name] = value + + def delitem(self, dic, name, raising=True): + """ delete ``name`` from dict, raise KeyError if it doesn't exist.""" + if name not in dic: + if raising: + raise KeyError(name) + else: + self._setitem.insert(0, (dic, name, dic.get(name, notset))) + del dic[name] + + def setenv(self, name, value, prepend=None): + """ set environment variable ``name`` to ``value``. if ``prepend`` + is a character, read the current environment variable value + and prepend the ``value`` adjoined with the ``prepend`` character.""" + value = str(value) + if prepend and name in os.environ: + value = value + prepend + os.environ[name] + self.setitem(os.environ, name, value) + + def delenv(self, name, raising=True): + """ delete ``name`` from environment, raise KeyError it not exists.""" + self.delitem(os.environ, name, raising=raising) + + def syspath_prepend(self, path): + """ prepend ``path`` to ``sys.path`` list of import locations. """ + if not hasattr(self, '_savesyspath'): + self._savesyspath = sys.path[:] + sys.path.insert(0, str(path)) + + def undo(self): + """ undo previous changes. This call consumes the + undo stack. Calling it a second time has no effect unless + you do more monkeypatching after the undo call.""" + for obj, name, value in self._setattr: + if value is not notset: + setattr(obj, name, value) + else: + delattr(obj, name) + self._setattr[:] = [] + for dictionary, name, value in self._setitem: + if value is notset: + del dictionary[name] + else: + dictionary[name] = value + self._setitem[:] = [] + if hasattr(self, '_savesyspath'): + sys.path[:] = self._savesyspath diff --git a/_pytest/nose.py b/_pytest/nose.py new file mode 100644 index 0000000000..2f90a93f8d --- /dev/null +++ b/_pytest/nose.py @@ -0,0 +1,47 @@ +""" run test suites written for nose. """ + +import pytest, py +import inspect +import sys + +def pytest_runtest_makereport(__multicall__, item, call): + SkipTest = getattr(sys.modules.get('nose', None), 'SkipTest', None) + if SkipTest: + if call.excinfo and call.excinfo.errisinstance(SkipTest): + # let's substitute the excinfo with a py.test.skip one + call2 = call.__class__(lambda: py.test.skip(str(call.excinfo.value)), call.when) + call.excinfo = call2.excinfo + + +def pytest_runtest_setup(item): + if isinstance(item, (pytest.Function)): + if isinstance(item.parent, pytest.Generator): + gen = item.parent + if not hasattr(gen, '_nosegensetup'): + call_optional(gen.obj, 'setup') + if isinstance(gen.parent, pytest.Instance): + call_optional(gen.parent.obj, 'setup') + gen._nosegensetup = True + if not call_optional(item.obj, 'setup'): + # call module level setup if there is no object level one + call_optional(item.parent.obj, 'setup') + +def pytest_runtest_teardown(item): + if isinstance(item, pytest.Function): + if not call_optional(item.obj, 'teardown'): + call_optional(item.parent.obj, 'teardown') + #if hasattr(item.parent, '_nosegensetup'): + # #call_optional(item._nosegensetup, 'teardown') + # del item.parent._nosegensetup + +def pytest_make_collect_report(collector): + if isinstance(collector, pytest.Generator): + call_optional(collector.obj, 'setup') + +def call_optional(obj, name): + method = getattr(obj, name, None) + if method: + # If there's any problems allow the exception to raise rather than + # silently ignoring them + method() + return True diff --git a/_pytest/pastebin.py b/_pytest/pastebin.py new file mode 100644 index 0000000000..52d7a65e60 --- /dev/null +++ b/_pytest/pastebin.py @@ -0,0 +1,63 @@ +""" submit failure or test session information to a pastebin service. """ +import py, sys + +class url: + base = "http://paste.pocoo.org" + xmlrpc = base + "/xmlrpc/" + show = base + "/show/" + +def pytest_addoption(parser): + group = parser.getgroup("terminal reporting") + group._addoption('--pastebin', metavar="mode", + action='store', dest="pastebin", default=None, + type="choice", choices=['failed', 'all'], + help="send failed|all info to Pocoo pastebin service.") + +def pytest_configure(__multicall__, config): + import tempfile + __multicall__.execute() + if config.option.pastebin == "all": + config._pastebinfile = tempfile.TemporaryFile('w+') + tr = config.pluginmanager.getplugin('terminalreporter') + oldwrite = tr._tw.write + def tee_write(s, **kwargs): + oldwrite(s, **kwargs) + config._pastebinfile.write(str(s)) + tr._tw.write = tee_write + +def pytest_unconfigure(config): + if hasattr(config, '_pastebinfile'): + config._pastebinfile.seek(0) + sessionlog = config._pastebinfile.read() + config._pastebinfile.close() + del config._pastebinfile + proxyid = getproxy().newPaste("python", sessionlog) + pastebinurl = "%s%s" % (url.show, proxyid) + sys.stderr.write("pastebin session-log: %s\n" % pastebinurl) + tr = config.pluginmanager.getplugin('terminalreporter') + del tr._tw.__dict__['write'] + +def getproxy(): + return py.std.xmlrpclib.ServerProxy(url.xmlrpc).pastes + +def pytest_terminal_summary(terminalreporter): + if terminalreporter.config.option.pastebin != "failed": + return + tr = terminalreporter + if 'failed' in tr.stats: + terminalreporter.write_sep("=", "Sending information to Paste Service") + if tr.config.option.debug: + terminalreporter.write_line("xmlrpcurl: %s" %(url.xmlrpc,)) + serverproxy = getproxy() + for rep in terminalreporter.stats.get('failed'): + try: + msg = rep.longrepr.reprtraceback.reprentries[-1].reprfileloc + except AttributeError: + msg = tr._getfailureheadline(rep) + tw = py.io.TerminalWriter(stringio=True) + rep.toterminal(tw) + s = tw.stringio.getvalue() + assert len(s) + proxyid = serverproxy.newPaste("python", s) + pastebinurl = "%s%s" % (url.show, proxyid) + tr.write_line("%s --> %s" %(msg, pastebinurl)) diff --git a/_pytest/pdb.py b/_pytest/pdb.py new file mode 100644 index 0000000000..507f93d7a3 --- /dev/null +++ b/_pytest/pdb.py @@ -0,0 +1,76 @@ +""" interactive debugging with PDB, the Python Debugger. """ + +import pytest, py +import sys + +def pytest_addoption(parser): + group = parser.getgroup("general") + group._addoption('--pdb', + action="store_true", dest="usepdb", default=False, + help="start the interactive Python debugger on errors.") + +def pytest_namespace(): + return {'set_trace': pytestPDB().set_trace} + +def pytest_configure(config): + if config.getvalue("usepdb"): + config.pluginmanager.register(PdbInvoke(), 'pdbinvoke') + +class pytestPDB: + """ Pseudo PDB that defers to the real pdb. """ + item = None + + def set_trace(self): + """ invoke PDB set_trace debugging, dropping any IO capturing. """ + frame = sys._getframe().f_back + item = getattr(self, 'item', None) + if item is not None: + capman = item.config.pluginmanager.getplugin("capturemanager") + out, err = capman.suspendcapture() + if hasattr(item, 'outerr'): + item.outerr = (item.outerr[0] + out, item.outerr[1] + err) + tw = py.io.TerminalWriter() + tw.line() + tw.sep(">", "PDB set_trace (IO-capturing turned off)") + py.std.pdb.Pdb().set_trace(frame) + +def pdbitem(item): + pytestPDB.item = item +pytest_runtest_setup = pytest_runtest_call = pytest_runtest_teardown = pdbitem + +def pytest_runtest_makereport(): + pytestPDB.item = None + +class PdbInvoke: + @pytest.mark.tryfirst + def pytest_runtest_makereport(self, item, call, __multicall__): + rep = __multicall__.execute() + if not call.excinfo or \ + call.excinfo.errisinstance(pytest.skip.Exception) or \ + call.excinfo.errisinstance(py.std.bdb.BdbQuit): + return rep + if "xfail" in rep.keywords: + return rep + # we assume that the above execute() suspended capturing + tw = py.io.TerminalWriter() + tw.line() + tw.sep(">", "traceback") + rep.toterminal(tw) + tw.sep(">", "entering PDB") + post_mortem(call.excinfo._excinfo[2]) + rep._pdbshown = True + return rep + +def post_mortem(t): + pdb = py.std.pdb + class Pdb(pdb.Pdb): + def get_stack(self, f, t): + stack, i = pdb.Pdb.get_stack(self, f, t) + if f is None: + i = max(0, len(stack) - 1) + while i and stack[i][0].f_locals.get("__tracebackhide__", False): + i-=1 + return stack, i + p = Pdb() + p.reset() + p.interaction(None, t) diff --git a/_pytest/pytester.py b/_pytest/pytester.py new file mode 100644 index 0000000000..f9da97b188 --- /dev/null +++ b/_pytest/pytester.py @@ -0,0 +1,674 @@ +""" (disabled by default) support for testing py.test and py.test plugins. """ + +import py, pytest +import sys, os +import re +import inspect +import time +from fnmatch import fnmatch +from _pytest.main import Session +from py.builtin import print_ +from _pytest.core import HookRelay + +def pytest_addoption(parser): + group = parser.getgroup("pylib") + group.addoption('--no-tools-on-path', + action="store_true", dest="notoolsonpath", default=False, + help=("discover tools on PATH instead of going through py.cmdline.") + ) + +def pytest_configure(config): + # This might be called multiple times. Only take the first. + global _pytest_fullpath + import pytest + try: + _pytest_fullpath + except NameError: + _pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc")) + +def pytest_funcarg___pytest(request): + return PytestArg(request) + +class PytestArg: + def __init__(self, request): + self.request = request + + def gethookrecorder(self, hook): + hookrecorder = HookRecorder(hook._pm) + hookrecorder.start_recording(hook._hookspecs) + self.request.addfinalizer(hookrecorder.finish_recording) + return hookrecorder + +class ParsedCall: + def __init__(self, name, locals): + assert '_name' not in locals + self.__dict__.update(locals) + self.__dict__.pop('self') + self._name = name + + def __repr__(self): + d = self.__dict__.copy() + del d['_name'] + return "<ParsedCall %r(**%r)>" %(self._name, d) + +class HookRecorder: + def __init__(self, pluginmanager): + self._pluginmanager = pluginmanager + self.calls = [] + self._recorders = {} + + def start_recording(self, hookspecs): + if not isinstance(hookspecs, (list, tuple)): + hookspecs = [hookspecs] + for hookspec in hookspecs: + assert hookspec not in self._recorders + class RecordCalls: + _recorder = self + for name, method in vars(hookspec).items(): + if name[0] != "_": + setattr(RecordCalls, name, self._makecallparser(method)) + recorder = RecordCalls() + self._recorders[hookspec] = recorder + self._pluginmanager.register(recorder) + self.hook = HookRelay(hookspecs, pm=self._pluginmanager, + prefix="pytest_") + + def finish_recording(self): + for recorder in self._recorders.values(): + self._pluginmanager.unregister(recorder) + self._recorders.clear() + + def _makecallparser(self, method): + name = method.__name__ + args, varargs, varkw, default = py.std.inspect.getargspec(method) + if not args or args[0] != "self": + args.insert(0, 'self') + fspec = py.std.inspect.formatargspec(args, varargs, varkw, default) + # we use exec because we want to have early type + # errors on wrong input arguments, using + # *args/**kwargs delays this and gives errors + # elsewhere + exec (py.code.compile(""" + def %(name)s%(fspec)s: + self._recorder.calls.append( + ParsedCall(%(name)r, locals())) + """ % locals())) + return locals()[name] + + def getcalls(self, names): + if isinstance(names, str): + names = names.split() + for name in names: + for cls in self._recorders: + if name in vars(cls): + break + else: + raise ValueError("callname %r not found in %r" %( + name, self._recorders.keys())) + l = [] + for call in self.calls: + if call._name in names: + l.append(call) + return l + + def contains(self, entries): + __tracebackhide__ = True + from py.builtin import print_ + i = 0 + entries = list(entries) + backlocals = py.std.sys._getframe(1).f_locals + while entries: + name, check = entries.pop(0) + for ind, call in enumerate(self.calls[i:]): + if call._name == name: + print_("NAMEMATCH", name, call) + if eval(check, backlocals, call.__dict__): + print_("CHECKERMATCH", repr(check), "->", call) + else: + print_("NOCHECKERMATCH", repr(check), "-", call) + continue + i += ind + 1 + break + print_("NONAMEMATCH", name, "with", call) + else: + py.test.fail("could not find %r check %r" % (name, check)) + + def popcall(self, name): + __tracebackhide__ = True + for i, call in enumerate(self.calls): + if call._name == name: + del self.calls[i] + return call + lines = ["could not find call %r, in:" % (name,)] + lines.extend([" %s" % str(x) for x in self.calls]) + py.test.fail("\n".join(lines)) + + def getcall(self, name): + l = self.getcalls(name) + assert len(l) == 1, (name, l) + return l[0] + + +def pytest_funcarg__linecomp(request): + return LineComp() + +def pytest_funcarg__LineMatcher(request): + return LineMatcher + +def pytest_funcarg__testdir(request): + tmptestdir = TmpTestdir(request) + return tmptestdir + +rex_outcome = re.compile("(\d+) (\w+)") +class RunResult: + def __init__(self, ret, outlines, errlines, duration): + self.ret = ret + self.outlines = outlines + self.errlines = errlines + self.stdout = LineMatcher(outlines) + self.stderr = LineMatcher(errlines) + self.duration = duration + + def parseoutcomes(self): + for line in reversed(self.outlines): + if 'seconds' in line: + outcomes = rex_outcome.findall(line) + if outcomes: + d = {} + for num, cat in outcomes: + d[cat] = int(num) + return d + +class TmpTestdir: + def __init__(self, request): + self.request = request + self.Config = request.config.__class__ + self._pytest = request.getfuncargvalue("_pytest") + # XXX remove duplication with tmpdir plugin + basetmp = request.config._tmpdirhandler.ensuretemp("testdir") + name = request.function.__name__ + for i in range(100): + try: + tmpdir = basetmp.mkdir(name + str(i)) + except py.error.EEXIST: + continue + break + # we need to create another subdir + # because Directory.collect() currently loads + # conftest.py from sibling directories + self.tmpdir = tmpdir.mkdir(name) + self.plugins = [] + self._syspathremove = [] + self.chdir() # always chdir + self.request.addfinalizer(self.finalize) + + def __repr__(self): + return "<TmpTestdir %r>" % (self.tmpdir,) + + def finalize(self): + for p in self._syspathremove: + py.std.sys.path.remove(p) + if hasattr(self, '_olddir'): + self._olddir.chdir() + # delete modules that have been loaded from tmpdir + for name, mod in list(sys.modules.items()): + if mod: + fn = getattr(mod, '__file__', None) + if fn and fn.startswith(str(self.tmpdir)): + del sys.modules[name] + + def getreportrecorder(self, obj): + if hasattr(obj, 'config'): + obj = obj.config + if hasattr(obj, 'hook'): + obj = obj.hook + assert hasattr(obj, '_hookspecs'), obj + reprec = ReportRecorder(obj) + reprec.hookrecorder = self._pytest.gethookrecorder(obj) + reprec.hook = reprec.hookrecorder.hook + return reprec + + def chdir(self): + old = self.tmpdir.chdir() + if not hasattr(self, '_olddir'): + self._olddir = old + + def _makefile(self, ext, args, kwargs): + items = list(kwargs.items()) + if args: + source = "\n".join(map(str, args)) + "\n" + basename = self.request.function.__name__ + items.insert(0, (basename, source)) + ret = None + for name, value in items: + p = self.tmpdir.join(name).new(ext=ext) + source = str(py.code.Source(value)).lstrip() + p.write(source.encode("utf-8"), "wb") + if ret is None: + ret = p + return ret + + + def makefile(self, ext, *args, **kwargs): + return self._makefile(ext, args, kwargs) + + def makeini(self, source): + return self.makefile('cfg', setup=source) + + def makeconftest(self, source): + return self.makepyfile(conftest=source) + + def makeini(self, source): + return self.makefile('.ini', tox=source) + + def getinicfg(self, source): + p = self.makeini(source) + return py.iniconfig.IniConfig(p)['pytest'] + + def makepyfile(self, *args, **kwargs): + return self._makefile('.py', args, kwargs) + + def maketxtfile(self, *args, **kwargs): + return self._makefile('.txt', args, kwargs) + + def syspathinsert(self, path=None): + if path is None: + path = self.tmpdir + py.std.sys.path.insert(0, str(path)) + self._syspathremove.append(str(path)) + + def mkdir(self, name): + return self.tmpdir.mkdir(name) + + def mkpydir(self, name): + p = self.mkdir(name) + p.ensure("__init__.py") + return p + + Session = Session + def getnode(self, config, arg): + session = Session(config) + assert '::' not in str(arg) + p = py.path.local(arg) + x = session.fspath.bestrelpath(p) + return session.perform_collect([x], genitems=False)[0] + + def getpathnode(self, path): + config = self.parseconfig(path) + session = Session(config) + x = session.fspath.bestrelpath(path) + return session.perform_collect([x], genitems=False)[0] + + def genitems(self, colitems): + session = colitems[0].session + result = [] + for colitem in colitems: + result.extend(session.genitems(colitem)) + return result + + def inline_genitems(self, *args): + #config = self.parseconfig(*args) + config = self.parseconfigure(*args) + rec = self.getreportrecorder(config) + session = Session(config) + session.perform_collect() + return session.items, rec + + def runitem(self, source): + # used from runner functional tests + item = self.getitem(source) + # the test class where we are called from wants to provide the runner + testclassinstance = py.builtin._getimself(self.request.function) + runner = testclassinstance.getrunner() + return runner(item) + + def inline_runsource(self, source, *cmdlineargs): + p = self.makepyfile(source) + l = list(cmdlineargs) + [p] + return self.inline_run(*l) + + def inline_runsource1(self, *args): + args = list(args) + source = args.pop() + p = self.makepyfile(source) + l = list(args) + [p] + reprec = self.inline_run(*l) + reports = reprec.getreports("pytest_runtest_logreport") + assert len(reports) == 1, reports + return reports[0] + + def inline_run(self, *args): + args = ("-s", ) + args # otherwise FD leakage + config = self.parseconfig(*args) + reprec = self.getreportrecorder(config) + #config.pluginmanager.do_configure(config) + config.hook.pytest_cmdline_main(config=config) + #config.pluginmanager.do_unconfigure(config) + return reprec + + def config_preparse(self): + config = self.Config() + for plugin in self.plugins: + if isinstance(plugin, str): + config.pluginmanager.import_plugin(plugin) + else: + if isinstance(plugin, dict): + plugin = PseudoPlugin(plugin) + if not config.pluginmanager.isregistered(plugin): + config.pluginmanager.register(plugin) + return config + + def parseconfig(self, *args): + if not args: + args = (self.tmpdir,) + config = self.config_preparse() + args = list(args) + for x in args: + if str(x).startswith('--basetemp'): + break + else: + args.append("--basetemp=%s" % self.tmpdir.dirpath('basetemp')) + config.parse(args) + return config + + def reparseconfig(self, args=None): + """ this is used from tests that want to re-invoke parse(). """ + if not args: + args = [self.tmpdir] + oldconfig = getattr(py.test, 'config', None) + try: + c = py.test.config = self.Config() + c.basetemp = py.path.local.make_numbered_dir(prefix="reparse", + keep=0, rootdir=self.tmpdir, lock_timeout=None) + c.parse(args) + return c + finally: + py.test.config = oldconfig + + def parseconfigure(self, *args): + config = self.parseconfig(*args) + config.pluginmanager.do_configure(config) + self.request.addfinalizer(lambda: + config.pluginmanager.do_unconfigure(config)) + return config + + def getitem(self, source, funcname="test_func"): + for item in self.getitems(source): + if item.name == funcname: + return item + assert 0, "%r item not found in module:\n%s" %(funcname, source) + + def getitems(self, source): + modcol = self.getmodulecol(source) + return self.genitems([modcol]) + + def getmodulecol(self, source, configargs=(), withinit=False): + kw = {self.request.function.__name__: py.code.Source(source).strip()} + path = self.makepyfile(**kw) + if withinit: + self.makepyfile(__init__ = "#") + self.config = config = self.parseconfigure(path, *configargs) + node = self.getnode(config, path) + #config.pluginmanager.do_unconfigure(config) + return node + + def collect_by_name(self, modcol, name): + for colitem in modcol._memocollect(): + if colitem.name == name: + return colitem + + def popen(self, cmdargs, stdout, stderr, **kw): + env = os.environ.copy() + env['PYTHONPATH'] = os.pathsep.join(filter(None, [ + str(os.getcwd()), env.get('PYTHONPATH', '')])) + kw['env'] = env + #print "env", env + return py.std.subprocess.Popen(cmdargs, stdout=stdout, stderr=stderr, **kw) + + def pytestmain(self, *args, **kwargs): + ret = pytest.main(*args, **kwargs) + if ret == 2: + raise KeyboardInterrupt() + def run(self, *cmdargs): + return self._run(*cmdargs) + + def _run(self, *cmdargs): + cmdargs = [str(x) for x in cmdargs] + p1 = self.tmpdir.join("stdout") + p2 = self.tmpdir.join("stderr") + print_("running", cmdargs, "curdir=", py.path.local()) + f1 = p1.open("wb") + f2 = p2.open("wb") + now = time.time() + popen = self.popen(cmdargs, stdout=f1, stderr=f2, + close_fds=(sys.platform != "win32")) + ret = popen.wait() + f1.close() + f2.close() + out = p1.read("rb") + out = getdecoded(out).splitlines() + err = p2.read("rb") + err = getdecoded(err).splitlines() + def dump_lines(lines, fp): + try: + for line in lines: + py.builtin.print_(line, file=fp) + except UnicodeEncodeError: + print("couldn't print to %s because of encoding" % (fp,)) + dump_lines(out, sys.stdout) + dump_lines(err, sys.stderr) + return RunResult(ret, out, err, time.time()-now) + + def runpybin(self, scriptname, *args): + fullargs = self._getpybinargs(scriptname) + args + return self.run(*fullargs) + + def _getpybinargs(self, scriptname): + if not self.request.config.getvalue("notoolsonpath"): + # XXX we rely on script refering to the correct environment + # we cannot use "(py.std.sys.executable,script)" + # becaue on windows the script is e.g. a py.test.exe + return (py.std.sys.executable, _pytest_fullpath,) + else: + py.test.skip("cannot run %r with --no-tools-on-path" % scriptname) + + def runpython(self, script, prepend=True): + if prepend: + s = self._getsysprepend() + if s: + script.write(s + "\n" + script.read()) + return self.run(sys.executable, script) + + def _getsysprepend(self): + if self.request.config.getvalue("notoolsonpath"): + s = "import sys;sys.path.insert(0,%r);" % str(py._pydir.dirpath()) + else: + s = "" + return s + + def runpython_c(self, command): + command = self._getsysprepend() + command + return self.run(py.std.sys.executable, "-c", command) + + def runpytest(self, *args): + p = py.path.local.make_numbered_dir(prefix="runpytest-", + keep=None, rootdir=self.tmpdir) + args = ('--basetemp=%s' % p, ) + args + #for x in args: + # if '--confcutdir' in str(x): + # break + #else: + # pass + # args = ('--confcutdir=.',) + args + plugins = [x for x in self.plugins if isinstance(x, str)] + if plugins: + args = ('-p', plugins[0]) + args + return self.runpybin("py.test", *args) + + def spawn_pytest(self, string, expect_timeout=10.0): + if self.request.config.getvalue("notoolsonpath"): + py.test.skip("--no-tools-on-path prevents running pexpect-spawn tests") + basetemp = self.tmpdir.mkdir("pexpect") + invoke = " ".join(map(str, self._getpybinargs("py.test"))) + cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string) + return self.spawn(cmd, expect_timeout=expect_timeout) + + def spawn(self, cmd, expect_timeout=10.0): + pexpect = py.test.importorskip("pexpect", "2.4") + if hasattr(sys, 'pypy_version_info') and '64' in py.std.platform.machine(): + pytest.skip("pypy-64 bit not supported") + logfile = self.tmpdir.join("spawn.out") + child = pexpect.spawn(cmd, logfile=logfile.open("w")) + child.timeout = expect_timeout + return child + +def getdecoded(out): + try: + return out.decode("utf-8") + except UnicodeDecodeError: + return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % ( + py.io.saferepr(out),) + +class PseudoPlugin: + def __init__(self, vars): + self.__dict__.update(vars) + +class ReportRecorder(object): + def __init__(self, hook): + self.hook = hook + self.pluginmanager = hook._pm + self.pluginmanager.register(self) + + def getcall(self, name): + return self.hookrecorder.getcall(name) + + def popcall(self, name): + return self.hookrecorder.popcall(name) + + def getcalls(self, names): + """ return list of ParsedCall instances matching the given eventname. """ + return self.hookrecorder.getcalls(names) + + # functionality for test reports + + def getreports(self, names="pytest_runtest_logreport pytest_collectreport"): + return [x.report for x in self.getcalls(names)] + + def matchreport(self, inamepart="", names="pytest_runtest_logreport pytest_collectreport", when=None): + """ return a testreport whose dotted import path matches """ + l = [] + for rep in self.getreports(names=names): + if when and getattr(rep, 'when', None) != when: + continue + if not inamepart or inamepart in rep.nodeid.split("::"): + l.append(rep) + if not l: + raise ValueError("could not find test report matching %r: no test reports at all!" % + (inamepart,)) + if len(l) > 1: + raise ValueError("found more than one testreport matching %r: %s" %( + inamepart, l)) + return l[0] + + def getfailures(self, names='pytest_runtest_logreport pytest_collectreport'): + return [rep for rep in self.getreports(names) if rep.failed] + + def getfailedcollections(self): + return self.getfailures('pytest_collectreport') + + def listoutcomes(self): + passed = [] + skipped = [] + failed = [] + for rep in self.getreports("pytest_runtest_logreport"): + if rep.passed: + if rep.when == "call": + passed.append(rep) + elif rep.skipped: + skipped.append(rep) + elif rep.failed: + failed.append(rep) + return passed, skipped, failed + + def countoutcomes(self): + return [len(x) for x in self.listoutcomes()] + + def assertoutcome(self, passed=0, skipped=0, failed=0): + realpassed, realskipped, realfailed = self.listoutcomes() + assert passed == len(realpassed) + assert skipped == len(realskipped) + assert failed == len(realfailed) + + def clear(self): + self.hookrecorder.calls[:] = [] + + def unregister(self): + self.pluginmanager.unregister(self) + self.hookrecorder.finish_recording() + +class LineComp: + def __init__(self): + self.stringio = py.io.TextIO() + + def assert_contains_lines(self, lines2): + """ assert that lines2 are contained (linearly) in lines1. + return a list of extralines found. + """ + __tracebackhide__ = True + val = self.stringio.getvalue() + self.stringio.truncate(0) + self.stringio.seek(0) + lines1 = val.split("\n") + return LineMatcher(lines1).fnmatch_lines(lines2) + +class LineMatcher: + def __init__(self, lines): + self.lines = lines + + def str(self): + return "\n".join(self.lines) + + def _getlines(self, lines2): + if isinstance(lines2, str): + lines2 = py.code.Source(lines2) + if isinstance(lines2, py.code.Source): + lines2 = lines2.strip().lines + return lines2 + + def fnmatch_lines_random(self, lines2): + lines2 = self._getlines(lines2) + for line in lines2: + for x in self.lines: + if line == x or fnmatch(x, line): + print_("matched: ", repr(line)) + break + else: + raise ValueError("line %r not found in output" % line) + + def fnmatch_lines(self, lines2): + def show(arg1, arg2): + py.builtin.print_(arg1, arg2, file=py.std.sys.stderr) + lines2 = self._getlines(lines2) + lines1 = self.lines[:] + nextline = None + extralines = [] + __tracebackhide__ = True + for line in lines2: + nomatchprinted = False + while lines1: + nextline = lines1.pop(0) + if line == nextline: + show("exact match:", repr(line)) + break + elif fnmatch(nextline, line): + show("fnmatch:", repr(line)) + show(" with:", repr(nextline)) + break + else: + if not nomatchprinted: + show("nomatch:", repr(line)) + nomatchprinted = True + show(" and:", repr(nextline)) + extralines.append(nextline) + else: + py.test.fail("remains unmatched: %r, see stderr" % (line,)) diff --git a/_pytest/python.py b/_pytest/python.py new file mode 100644 index 0000000000..a2b7bb35d1 --- /dev/null +++ b/_pytest/python.py @@ -0,0 +1,855 @@ +""" Python test discovery, setup and run of test functions. """ +import py +import inspect +import sys +import pytest +from py._code.code import TerminalRepr + +import _pytest +cutdir = py.path.local(_pytest.__file__).dirpath() + +def pytest_addoption(parser): + group = parser.getgroup("general") + group.addoption('--funcargs', + action="store_true", dest="showfuncargs", default=False, + help="show available function arguments, sorted by plugin") + parser.addini("python_files", type="args", + default=('test_*.py', '*_test.py'), + help="glob-style file patterns for Python test module discovery") + parser.addini("python_classes", type="args", default=("Test",), + help="prefixes for Python test class discovery") + parser.addini("python_functions", type="args", default=("test",), + help="prefixes for Python test function and method discovery") + +def pytest_cmdline_main(config): + if config.option.showfuncargs: + showfuncargs(config) + return 0 + +@pytest.mark.trylast +def pytest_namespace(): + raises.Exception = pytest.fail.Exception + return { + 'raises' : raises, + 'collect': { + 'Module': Module, 'Class': Class, 'Instance': Instance, + 'Function': Function, 'Generator': Generator, + '_fillfuncargs': fillfuncargs} + } + +def pytest_funcarg__pytestconfig(request): + """ the pytest config object with access to command line opts.""" + return request.config + +def pytest_pyfunc_call(__multicall__, pyfuncitem): + if not __multicall__.execute(): + testfunction = pyfuncitem.obj + if pyfuncitem._isyieldedfunction(): + testfunction(*pyfuncitem._args) + else: + funcargs = pyfuncitem.funcargs + testfunction(**funcargs) + +def pytest_collect_file(path, parent): + ext = path.ext + pb = path.purebasename + if ext == ".py": + if not parent.session.isinitpath(path): + for pat in parent.config.getini('python_files'): + if path.fnmatch(pat): + break + else: + return + return parent.ihook.pytest_pycollect_makemodule( + path=path, parent=parent) + +def pytest_pycollect_makemodule(path, parent): + return Module(path, parent) + +def pytest_pycollect_makeitem(__multicall__, collector, name, obj): + res = __multicall__.execute() + if res is not None: + return res + if collector._istestclasscandidate(name, obj): + #if hasattr(collector.obj, 'unittest'): + # return # we assume it's a mixin class for a TestCase derived one + return collector.Class(name, parent=collector) + elif collector.funcnamefilter(name) and hasattr(obj, '__call__'): + if is_generator(obj): + return Generator(name, parent=collector) + else: + return collector._genfunctions(name, obj) + +def is_generator(func): + try: + return py.code.getrawcode(func).co_flags & 32 # generator function + except AttributeError: # builtin functions have no bytecode + # assume them to not be generators + return False + +class PyobjMixin(object): + def obj(): + def fget(self): + try: + return self._obj + except AttributeError: + self._obj = obj = self._getobj() + return obj + def fset(self, value): + self._obj = value + return property(fget, fset, None, "underlying python object") + obj = obj() + + def _getobj(self): + return getattr(self.parent.obj, self.name) + + def getmodpath(self, stopatmodule=True, includemodule=False): + """ return python path relative to the containing module. """ + chain = self.listchain() + chain.reverse() + parts = [] + for node in chain: + if isinstance(node, Instance): + continue + name = node.name + if isinstance(node, Module): + assert name.endswith(".py") + name = name[:-3] + if stopatmodule: + if includemodule: + parts.append(name) + break + parts.append(name) + parts.reverse() + s = ".".join(parts) + return s.replace(".[", "[") + + def _getfslineno(self): + try: + return self._fslineno + except AttributeError: + pass + obj = self.obj + # xxx let decorators etc specify a sane ordering + if hasattr(obj, 'place_as'): + obj = obj.place_as + + self._fslineno = py.code.getfslineno(obj) + return self._fslineno + + def reportinfo(self): + # XXX caching? + obj = self.obj + if hasattr(obj, 'compat_co_firstlineno'): + # nose compatibility + fspath = sys.modules[obj.__module__].__file__ + if fspath.endswith(".pyc"): + fspath = fspath[:-1] + #assert 0 + #fn = inspect.getsourcefile(obj) or inspect.getfile(obj) + lineno = obj.compat_co_firstlineno + modpath = obj.__module__ + else: + fspath, lineno = self._getfslineno() + modpath = self.getmodpath() + return fspath, lineno, modpath + +class PyCollectorMixin(PyobjMixin, pytest.Collector): + + def funcnamefilter(self, name): + for prefix in self.config.getini("python_functions"): + if name.startswith(prefix): + return True + + def classnamefilter(self, name): + for prefix in self.config.getini("python_classes"): + if name.startswith(prefix): + return True + + def collect(self): + # NB. we avoid random getattrs and peek in the __dict__ instead + # (XXX originally introduced from a PyPy need, still true?) + dicts = [getattr(self.obj, '__dict__', {})] + for basecls in inspect.getmro(self.obj.__class__): + dicts.append(basecls.__dict__) + seen = {} + l = [] + for dic in dicts: + for name, obj in dic.items(): + if name in seen: + continue + seen[name] = True + if name[0] != "_": + res = self.makeitem(name, obj) + if res is None: + continue + if not isinstance(res, list): + res = [res] + l.extend(res) + l.sort(key=lambda item: item.reportinfo()[:2]) + return l + + def makeitem(self, name, obj): + return self.ihook.pytest_pycollect_makeitem( + collector=self, name=name, obj=obj) + + def _istestclasscandidate(self, name, obj): + if self.classnamefilter(name) and \ + inspect.isclass(obj): + if hasinit(obj): + # XXX WARN + return False + return True + + def _genfunctions(self, name, funcobj): + module = self.getparent(Module).obj + clscol = self.getparent(Class) + cls = clscol and clscol.obj or None + metafunc = Metafunc(funcobj, config=self.config, + cls=cls, module=module) + gentesthook = self.config.hook.pytest_generate_tests + extra = [module] + if cls is not None: + extra.append(cls()) + plugins = self.getplugins() + extra + gentesthook.pcall(plugins, metafunc=metafunc) + if not metafunc._calls: + return self.Function(name, parent=self) + l = [] + for callspec in metafunc._calls: + subname = "%s[%s]" %(name, callspec.id) + function = self.Function(name=subname, parent=self, + callspec=callspec, callobj=funcobj, keywords={callspec.id:True}) + l.append(function) + return l + +class Module(pytest.File, PyCollectorMixin): + def _getobj(self): + return self._memoizedcall('_obj', self._importtestmodule) + + def _importtestmodule(self): + # we assume we are only called once per module + try: + mod = self.fspath.pyimport(ensuresyspath=True) + except SyntaxError: + excinfo = py.code.ExceptionInfo() + raise self.CollectError(excinfo.getrepr(style="short")) + except self.fspath.ImportMismatchError: + e = sys.exc_info()[1] + raise self.CollectError( + "import file mismatch:\n" + "imported module %r has this __file__ attribute:\n" + " %s\n" + "which is not the same as the test file we want to collect:\n" + " %s\n" + "HINT: use a unique basename for your test file modules" + % e.args + ) + #print "imported test module", mod + self.config.pluginmanager.consider_module(mod) + return mod + + def setup(self): + if hasattr(self.obj, 'setup_module'): + #XXX: nose compat hack, move to nose plugin + # if it takes a positional arg, its probably a pytest style one + # so we pass the current module object + if inspect.getargspec(self.obj.setup_module)[0]: + self.obj.setup_module(self.obj) + else: + self.obj.setup_module() + + def teardown(self): + if hasattr(self.obj, 'teardown_module'): + #XXX: nose compat hack, move to nose plugin + # if it takes a positional arg, its probably a py.test style one + # so we pass the current module object + if inspect.getargspec(self.obj.teardown_module)[0]: + self.obj.teardown_module(self.obj) + else: + self.obj.teardown_module() + +class Class(PyCollectorMixin, pytest.Collector): + + def collect(self): + return [self.Instance(name="()", parent=self)] + + def setup(self): + setup_class = getattr(self.obj, 'setup_class', None) + if setup_class is not None: + setup_class = getattr(setup_class, 'im_func', setup_class) + setup_class(self.obj) + + def teardown(self): + teardown_class = getattr(self.obj, 'teardown_class', None) + if teardown_class is not None: + teardown_class = getattr(teardown_class, 'im_func', teardown_class) + teardown_class(self.obj) + +class Instance(PyCollectorMixin, pytest.Collector): + def _getobj(self): + return self.parent.obj() + + def newinstance(self): + self.obj = self._getobj() + return self.obj + +class FunctionMixin(PyobjMixin): + """ mixin for the code common to Function and Generator. + """ + + def setup(self): + """ perform setup for this test function. """ + if inspect.ismethod(self.obj): + name = 'setup_method' + else: + name = 'setup_function' + if hasattr(self, '_preservedparent'): + obj = self._preservedparent + elif isinstance(self.parent, Instance): + obj = self.parent.newinstance() + self.obj = self._getobj() + else: + obj = self.parent.obj + setup_func_or_method = getattr(obj, name, None) + if setup_func_or_method is not None: + setup_func_or_method(self.obj) + + def teardown(self): + """ perform teardown for this test function. """ + if inspect.ismethod(self.obj): + name = 'teardown_method' + else: + name = 'teardown_function' + obj = self.parent.obj + teardown_func_or_meth = getattr(obj, name, None) + if teardown_func_or_meth is not None: + teardown_func_or_meth(self.obj) + + def _prunetraceback(self, excinfo): + if hasattr(self, '_obj') and not self.config.option.fulltrace: + code = py.code.Code(self.obj) + path, firstlineno = code.path, code.firstlineno + traceback = excinfo.traceback + ntraceback = traceback.cut(path=path, firstlineno=firstlineno) + if ntraceback == traceback: + ntraceback = ntraceback.cut(path=path) + if ntraceback == traceback: + ntraceback = ntraceback.cut(excludepath=cutdir) + excinfo.traceback = ntraceback.filter() + + def _repr_failure_py(self, excinfo, style="long"): + if excinfo.errisinstance(FuncargRequest.LookupError): + fspath, lineno, msg = self.reportinfo() + lines, _ = inspect.getsourcelines(self.obj) + for i, line in enumerate(lines): + if line.strip().startswith('def'): + return FuncargLookupErrorRepr(fspath, lineno, + lines[:i+1], str(excinfo.value)) + if excinfo.errisinstance(pytest.fail.Exception): + if not excinfo.value.pytrace: + return str(excinfo.value) + return super(FunctionMixin, self)._repr_failure_py(excinfo, + style=style) + + def repr_failure(self, excinfo, outerr=None): + assert outerr is None, "XXX outerr usage is deprecated" + return self._repr_failure_py(excinfo, + style=self.config.option.tbstyle) + +class FuncargLookupErrorRepr(TerminalRepr): + def __init__(self, filename, firstlineno, deflines, errorstring): + self.deflines = deflines + self.errorstring = errorstring + self.filename = filename + self.firstlineno = firstlineno + + def toterminal(self, tw): + tw.line() + for line in self.deflines: + tw.line(" " + line.strip()) + for line in self.errorstring.split("\n"): + tw.line(" " + line.strip(), red=True) + tw.line() + tw.line("%s:%d" % (self.filename, self.firstlineno+1)) + +class Generator(FunctionMixin, PyCollectorMixin, pytest.Collector): + def collect(self): + # test generators are seen as collectors but they also + # invoke setup/teardown on popular request + # (induced by the common "test_*" naming shared with normal tests) + self.config._setupstate.prepare(self) + # see FunctionMixin.setup and test_setupstate_is_preserved_134 + self._preservedparent = self.parent.obj + l = [] + seen = {} + for i, x in enumerate(self.obj()): + name, call, args = self.getcallargs(x) + if not py.builtin.callable(call): + raise TypeError("%r yielded non callable test %r" %(self.obj, call,)) + if name is None: + name = "[%d]" % i + else: + name = "['%s']" % name + if name in seen: + raise ValueError("%r generated tests with non-unique name %r" %(self, name)) + seen[name] = True + l.append(self.Function(name, self, args=args, callobj=call)) + return l + + def getcallargs(self, obj): + if not isinstance(obj, (tuple, list)): + obj = (obj,) + # explict naming + if isinstance(obj[0], py.builtin._basestring): + name = obj[0] + obj = obj[1:] + else: + name = None + call, args = obj[0], obj[1:] + return name, call, args + + +# +# Test Items +# +_dummy = object() +class Function(FunctionMixin, pytest.Item): + """ a Function Item is responsible for setting up + and executing a Python callable test object. + """ + _genid = None + def __init__(self, name, parent=None, args=None, config=None, + callspec=None, callobj=_dummy, keywords=None, session=None): + super(Function, self).__init__(name, parent, + config=config, session=session) + self._args = args + if self._isyieldedfunction(): + assert not callspec, ( + "yielded functions (deprecated) cannot have funcargs") + else: + if callspec is not None: + self.funcargs = callspec.funcargs or {} + self._genid = callspec.id + if hasattr(callspec, "param"): + self._requestparam = callspec.param + else: + self.funcargs = {} + if callobj is not _dummy: + self._obj = callobj + self.function = getattr(self.obj, 'im_func', self.obj) + self.keywords.update(py.builtin._getfuncdict(self.obj) or {}) + if keywords: + self.keywords.update(keywords) + + def _getobj(self): + name = self.name + i = name.find("[") # parametrization + if i != -1: + name = name[:i] + return getattr(self.parent.obj, name) + + def _isyieldedfunction(self): + return self._args is not None + + def runtest(self): + """ execute the underlying test function. """ + self.ihook.pytest_pyfunc_call(pyfuncitem=self) + + def setup(self): + super(Function, self).setup() + if hasattr(self, 'funcargs'): + fillfuncargs(self) + + def __eq__(self, other): + try: + return (self.name == other.name and + self._args == other._args and + self.parent == other.parent and + self.obj == other.obj and + getattr(self, '_genid', None) == + getattr(other, '_genid', None) + ) + except AttributeError: + pass + return False + + def __ne__(self, other): + return not self == other + + def __hash__(self): + return hash((self.parent, self.name)) + +def hasinit(obj): + init = getattr(obj, '__init__', None) + if init: + if init != object.__init__: + return True + + +def getfuncargnames(function): + # XXX merge with main.py's varnames + argnames = py.std.inspect.getargs(py.code.getrawcode(function))[0] + startindex = py.std.inspect.ismethod(function) and 1 or 0 + defaults = getattr(function, 'func_defaults', + getattr(function, '__defaults__', None)) or () + numdefaults = len(defaults) + if numdefaults: + return argnames[startindex:-numdefaults] + return argnames[startindex:] + +def fillfuncargs(function): + """ fill missing funcargs. """ + request = FuncargRequest(pyfuncitem=function) + request._fillfuncargs() + +_notexists = object() +class CallSpec: + def __init__(self, funcargs, id, param): + self.funcargs = funcargs + self.id = id + if param is not _notexists: + self.param = param + def __repr__(self): + return "<CallSpec id=%r param=%r funcargs=%r>" %( + self.id, getattr(self, 'param', '?'), self.funcargs) + +class Metafunc: + def __init__(self, function, config=None, cls=None, module=None): + self.config = config + self.module = module + self.function = function + self.funcargnames = getfuncargnames(function) + self.cls = cls + self.module = module + self._calls = [] + self._ids = py.builtin.set() + + def addcall(self, funcargs=None, id=_notexists, param=_notexists): + """ add a new call to the underlying test function during the + collection phase of a test run. + + :arg funcargs: argument keyword dictionary used when invoking + the test function. + + :arg id: used for reporting and identification purposes. If you + don't supply an `id` the length of the currently + list of calls to the test function will be used. + + :arg param: will be exposed to a later funcarg factory invocation + through the ``request.param`` attribute. Setting it (instead of + directly providing a ``funcargs`` ditionary) is called + *indirect parametrization*. Indirect parametrization is + preferable if test values are expensive to setup or can + only be created after certain fixtures or test-run related + initialization code has been run. + """ + assert funcargs is None or isinstance(funcargs, dict) + if id is None: + raise ValueError("id=None not allowed") + if id is _notexists: + id = len(self._calls) + id = str(id) + if id in self._ids: + raise ValueError("duplicate id %r" % id) + self._ids.add(id) + self._calls.append(CallSpec(funcargs, id, param)) + +class FuncargRequest: + """ A request for function arguments from a test function. """ + _argprefix = "pytest_funcarg__" + _argname = None + + class LookupError(LookupError): + """ error on performing funcarg request. """ + + def __init__(self, pyfuncitem): + self._pyfuncitem = pyfuncitem + if hasattr(pyfuncitem, '_requestparam'): + self.param = pyfuncitem._requestparam + extra = [obj for obj in (self.module, self.instance) if obj] + self._plugins = pyfuncitem.getplugins() + extra + self._funcargs = self._pyfuncitem.funcargs.copy() + self._name2factory = {} + self._currentarg = None + + @property + def function(self): + """ function object of the test invocation. """ + return self._pyfuncitem.obj + + @property + def keywords(self): + """ keywords of the test function item. + + .. versionadded:: 2.0 + """ + return self._pyfuncitem.keywords + + @property + def module(self): + """ module where the test function was collected. """ + return self._pyfuncitem.getparent(pytest.Module).obj + + @property + def cls(self): + """ class (can be None) where the test function was collected. """ + clscol = self._pyfuncitem.getparent(pytest.Class) + if clscol: + return clscol.obj + @property + def instance(self): + """ instance (can be None) on which test function was collected. """ + return py.builtin._getimself(self.function) + + @property + def config(self): + """ the pytest config object associated with this request. """ + return self._pyfuncitem.config + + @property + def fspath(self): + """ the file system path of the test module which collected this test. """ + return self._pyfuncitem.fspath + + def _fillfuncargs(self): + argnames = getfuncargnames(self.function) + if argnames: + assert not getattr(self._pyfuncitem, '_args', None), ( + "yielded functions cannot have funcargs") + for argname in argnames: + if argname not in self._pyfuncitem.funcargs: + self._pyfuncitem.funcargs[argname] = self.getfuncargvalue(argname) + + + def applymarker(self, marker): + """ apply a marker to a single test function invocation. + This method is useful if you don't want to have a keyword/marker + on all function invocations. + + :arg marker: a :py:class:`_pytest.mark.MarkDecorator` object + created by a call to ``py.test.mark.NAME(...)``. + """ + if not isinstance(marker, py.test.mark.XYZ.__class__): + raise ValueError("%r is not a py.test.mark.* object") + self._pyfuncitem.keywords[marker.markname] = marker + + def cached_setup(self, setup, teardown=None, scope="module", extrakey=None): + """ return a testing resource managed by ``setup`` & + ``teardown`` calls. ``scope`` and ``extrakey`` determine when the + ``teardown`` function will be called so that subsequent calls to + ``setup`` would recreate the resource. + + :arg teardown: function receiving a previously setup resource. + :arg setup: a no-argument function creating a resource. + :arg scope: a string value out of ``function``, ``class``, ``module`` + or ``session`` indicating the caching lifecycle of the resource. + :arg extrakey: added to internal caching key of (funcargname, scope). + """ + if not hasattr(self.config, '_setupcache'): + self.config._setupcache = {} # XXX weakref? + cachekey = (self._currentarg, self._getscopeitem(scope), extrakey) + cache = self.config._setupcache + try: + val = cache[cachekey] + except KeyError: + val = setup() + cache[cachekey] = val + if teardown is not None: + def finalizer(): + del cache[cachekey] + teardown(val) + self._addfinalizer(finalizer, scope=scope) + return val + + def getfuncargvalue(self, argname): + """ Retrieve a function argument by name for this test + function invocation. This allows one function argument factory + to call another function argument factory. If there are two + funcarg factories for the same test function argument the first + factory may use ``getfuncargvalue`` to call the second one and + do something additional with the resource. + """ + try: + return self._funcargs[argname] + except KeyError: + pass + if argname not in self._name2factory: + self._name2factory[argname] = self.config.pluginmanager.listattr( + plugins=self._plugins, + attrname=self._argprefix + str(argname) + ) + #else: we are called recursively + if not self._name2factory[argname]: + self._raiselookupfailed(argname) + funcargfactory = self._name2factory[argname].pop() + oldarg = self._currentarg + self._currentarg = argname + try: + self._funcargs[argname] = res = funcargfactory(request=self) + finally: + self._currentarg = oldarg + return res + + def _getscopeitem(self, scope): + if scope == "function": + return self._pyfuncitem + elif scope == "session": + return None + elif scope == "class": + x = self._pyfuncitem.getparent(pytest.Class) + if x is not None: + return x + scope = "module" + if scope == "module": + return self._pyfuncitem.getparent(pytest.Module) + raise ValueError("unknown finalization scope %r" %(scope,)) + + def addfinalizer(self, finalizer): + """add finalizer function to be called after test function + finished execution. """ + self._addfinalizer(finalizer, scope="function") + + def _addfinalizer(self, finalizer, scope): + colitem = self._getscopeitem(scope) + self.config._setupstate.addfinalizer( + finalizer=finalizer, colitem=colitem) + + def __repr__(self): + return "<FuncargRequest for %r>" %(self._pyfuncitem) + + def _raiselookupfailed(self, argname): + available = [] + for plugin in self._plugins: + for name in vars(plugin): + if name.startswith(self._argprefix): + name = name[len(self._argprefix):] + if name not in available: + available.append(name) + fspath, lineno, msg = self._pyfuncitem.reportinfo() + msg = "LookupError: no factory found for function argument %r" % (argname,) + msg += "\n available funcargs: %s" %(", ".join(available),) + msg += "\n use 'py.test --funcargs [testpath]' for help on them." + raise self.LookupError(msg) + +def showfuncargs(config): + from _pytest.main import Session + session = Session(config) + session.perform_collect() + if session.items: + plugins = session.items[0].getplugins() + else: + plugins = session.getplugins() + curdir = py.path.local() + tw = py.io.TerminalWriter() + verbose = config.getvalue("verbose") + for plugin in plugins: + available = [] + for name, factory in vars(plugin).items(): + if name.startswith(FuncargRequest._argprefix): + name = name[len(FuncargRequest._argprefix):] + if name not in available: + available.append([name, factory]) + if available: + pluginname = plugin.__name__ + for name, factory in available: + loc = getlocation(factory, curdir) + if verbose: + funcargspec = "%s -- %s" %(name, loc,) + else: + funcargspec = name + tw.line(funcargspec, green=True) + doc = factory.__doc__ or "" + if doc: + for line in doc.split("\n"): + tw.line(" " + line.strip()) + else: + tw.line(" %s: no docstring available" %(loc,), + red=True) + +def getlocation(function, curdir): + import inspect + fn = py.path.local(inspect.getfile(function)) + lineno = py.builtin._getcode(function).co_firstlineno + if fn.relto(curdir): + fn = fn.relto(curdir) + return "%s:%d" %(fn, lineno+1) + +# builtin pytest.raises helper + +def raises(ExpectedException, *args, **kwargs): + """ assert that a code block/function call raises @ExpectedException + and raise a failure exception otherwise. + + If using Python 2.5 or above, you may use this function as a + context manager:: + + >>> with raises(ZeroDivisionError): + ... 1/0 + + Or you can specify a callable by passing a to-be-called lambda:: + + >>> raises(ZeroDivisionError, lambda: 1/0) + <ExceptionInfo ...> + + or you can specify an arbitrary callable with arguments:: + + >>> def f(x): return 1/x + ... + >>> raises(ZeroDivisionError, f, 0) + <ExceptionInfo ...> + >>> raises(ZeroDivisionError, f, x=0) + <ExceptionInfo ...> + + A third possibility is to use a string which which will + be executed:: + + >>> raises(ZeroDivisionError, "f(0)") + <ExceptionInfo ...> + """ + __tracebackhide__ = True + + if not args: + return RaisesContext(ExpectedException) + elif isinstance(args[0], str): + code, = args + assert isinstance(code, str) + frame = sys._getframe(1) + loc = frame.f_locals.copy() + loc.update(kwargs) + #print "raises frame scope: %r" % frame.f_locals + try: + code = py.code.Source(code).compile() + py.builtin.exec_(code, frame.f_globals, loc) + # XXX didn'T mean f_globals == f_locals something special? + # this is destroyed here ... + except ExpectedException: + return py.code.ExceptionInfo() + else: + func = args[0] + try: + func(*args[1:], **kwargs) + except ExpectedException: + return py.code.ExceptionInfo() + k = ", ".join(["%s=%r" % x for x in kwargs.items()]) + if k: + k = ', ' + k + expr = '%s(%r%s)' %(getattr(func, '__name__', func), args, k) + pytest.fail("DID NOT RAISE") + +class RaisesContext(object): + def __init__(self, ExpectedException): + self.ExpectedException = ExpectedException + self.excinfo = None + + def __enter__(self): + self.excinfo = object.__new__(py.code.ExceptionInfo) + return self.excinfo + + def __exit__(self, *tp): + __tracebackhide__ = True + if tp[0] is None: + pytest.fail("DID NOT RAISE") + self.excinfo.__init__(tp) + return issubclass(self.excinfo.type, self.ExpectedException) + diff --git a/_pytest/recwarn.py b/_pytest/recwarn.py new file mode 100644 index 0000000000..e2fb2b17e4 --- /dev/null +++ b/_pytest/recwarn.py @@ -0,0 +1,96 @@ +""" recording warnings during test function execution. """ + +import py +import sys, os + +def pytest_funcarg__recwarn(request): + """Return a WarningsRecorder instance that provides these methods: + + * ``pop(category=None)``: return last warning matching the category. + * ``clear()``: clear list of warnings + """ + if sys.version_info >= (2,7): + import warnings + oldfilters = warnings.filters[:] + warnings.simplefilter('default') + def reset_filters(): + warnings.filters[:] = oldfilters + request.addfinalizer(reset_filters) + wrec = WarningsRecorder() + request.addfinalizer(wrec.finalize) + return wrec + +def pytest_namespace(): + return {'deprecated_call': deprecated_call} + +def deprecated_call(func, *args, **kwargs): + """ assert that calling ``func(*args, **kwargs)`` + triggers a DeprecationWarning. + """ + warningmodule = py.std.warnings + l = [] + oldwarn_explicit = getattr(warningmodule, 'warn_explicit') + def warn_explicit(*args, **kwargs): + l.append(args) + oldwarn_explicit(*args, **kwargs) + oldwarn = getattr(warningmodule, 'warn') + def warn(*args, **kwargs): + l.append(args) + oldwarn(*args, **kwargs) + + warningmodule.warn_explicit = warn_explicit + warningmodule.warn = warn + try: + ret = func(*args, **kwargs) + finally: + warningmodule.warn_explicit = warn_explicit + warningmodule.warn = warn + if not l: + #print warningmodule + __tracebackhide__ = True + raise AssertionError("%r did not produce DeprecationWarning" %(func,)) + return ret + + +class RecordedWarning: + def __init__(self, message, category, filename, lineno, line): + self.message = message + self.category = category + self.filename = filename + self.lineno = lineno + self.line = line + +class WarningsRecorder: + def __init__(self): + warningmodule = py.std.warnings + self.list = [] + def showwarning(message, category, filename, lineno, line=0): + self.list.append(RecordedWarning( + message, category, filename, lineno, line)) + try: + self.old_showwarning(message, category, + filename, lineno, line=line) + except TypeError: + # < python2.6 + self.old_showwarning(message, category, filename, lineno) + self.old_showwarning = warningmodule.showwarning + warningmodule.showwarning = showwarning + + def pop(self, cls=Warning): + """ pop the first recorded warning, raise exception if not exists.""" + for i, w in enumerate(self.list): + if issubclass(w.category, cls): + return self.list.pop(i) + __tracebackhide__ = True + assert 0, "%r not found in %r" %(cls, self.list) + + #def resetregistry(self): + # import warnings + # warnings.onceregistry.clear() + # warnings.__warningregistry__.clear() + + def clear(self): + self.list[:] = [] + + def finalize(self): + py.std.warnings.showwarning = self.old_showwarning diff --git a/_pytest/resultlog.py b/_pytest/resultlog.py new file mode 100644 index 0000000000..7f879cce59 --- /dev/null +++ b/_pytest/resultlog.py @@ -0,0 +1,93 @@ +""" (disabled by default) create result information in a plain text file. """ + +import py + +def pytest_addoption(parser): + group = parser.getgroup("terminal reporting", "resultlog plugin options") + group.addoption('--resultlog', action="store", dest="resultlog", + metavar="path", default=None, + help="path for machine-readable result log.") + +def pytest_configure(config): + resultlog = config.option.resultlog + # prevent opening resultlog on slave nodes (xdist) + if resultlog and not hasattr(config, 'slaveinput'): + logfile = open(resultlog, 'w', 1) # line buffered + config._resultlog = ResultLog(config, logfile) + config.pluginmanager.register(config._resultlog) + +def pytest_unconfigure(config): + resultlog = getattr(config, '_resultlog', None) + if resultlog: + resultlog.logfile.close() + del config._resultlog + config.pluginmanager.unregister(resultlog) + +def generic_path(item): + chain = item.listchain() + gpath = [chain[0].name] + fspath = chain[0].fspath + fspart = False + for node in chain[1:]: + newfspath = node.fspath + if newfspath == fspath: + if fspart: + gpath.append(':') + fspart = False + else: + gpath.append('.') + else: + gpath.append('/') + fspart = True + name = node.name + if name[0] in '([': + gpath.pop() + gpath.append(name) + fspath = newfspath + return ''.join(gpath) + +class ResultLog(object): + def __init__(self, config, logfile): + self.config = config + self.logfile = logfile # preferably line buffered + + def write_log_entry(self, testpath, lettercode, longrepr): + py.builtin.print_("%s %s" % (lettercode, testpath), file=self.logfile) + for line in longrepr.splitlines(): + py.builtin.print_(" %s" % line, file=self.logfile) + + def log_outcome(self, report, lettercode, longrepr): + testpath = getattr(report, 'nodeid', None) + if testpath is None: + testpath = report.fspath + self.write_log_entry(testpath, lettercode, longrepr) + + def pytest_runtest_logreport(self, report): + res = self.config.hook.pytest_report_teststatus(report=report) + code = res[1] + if code == 'x': + longrepr = str(report.longrepr) + elif code == 'X': + longrepr = '' + elif report.passed: + longrepr = "" + elif report.failed: + longrepr = str(report.longrepr) + elif report.skipped: + longrepr = str(report.longrepr[2]) + self.log_outcome(report, code, longrepr) + + def pytest_collectreport(self, report): + if not report.passed: + if report.failed: + code = "F" + longrepr = str(report.longrepr.reprcrash) + else: + assert report.skipped + code = "S" + longrepr = "%s:%d: %s" % report.longrepr + self.log_outcome(report, code, longrepr) + + def pytest_internalerror(self, excrepr): + path = excrepr.reprcrash.path + self.write_log_entry(path, '!', str(excrepr)) diff --git a/_pytest/runner.py b/_pytest/runner.py new file mode 100644 index 0000000000..08cdc96f31 --- /dev/null +++ b/_pytest/runner.py @@ -0,0 +1,390 @@ +""" basic collect and runtest protocol implementations """ + +import py, sys +from py._code.code import TerminalRepr + +def pytest_namespace(): + return { + 'fail' : fail, + 'skip' : skip, + 'importorskip' : importorskip, + 'exit' : exit, + } + +# +# pytest plugin hooks + +# XXX move to pytest_sessionstart and fix py.test owns tests +def pytest_configure(config): + config._setupstate = SetupState() + +def pytest_sessionfinish(session, exitstatus): + if hasattr(session.config, '_setupstate'): + hook = session.config.hook + rep = hook.pytest__teardown_final(session=session) + if rep: + hook.pytest__teardown_final_logerror(session=session, report=rep) + session.exitstatus = 1 + +class NodeInfo: + def __init__(self, location): + self.location = location + +def pytest_runtest_protocol(item): + item.ihook.pytest_runtest_logstart( + nodeid=item.nodeid, location=item.location, + ) + runtestprotocol(item) + return True + +def runtestprotocol(item, log=True): + rep = call_and_report(item, "setup", log) + reports = [rep] + if rep.passed: + reports.append(call_and_report(item, "call", log)) + reports.append(call_and_report(item, "teardown", log)) + return reports + +def pytest_runtest_setup(item): + item.config._setupstate.prepare(item) + +def pytest_runtest_call(item): + item.runtest() + +def pytest_runtest_teardown(item): + item.config._setupstate.teardown_exact(item) + +def pytest__teardown_final(session): + call = CallInfo(session.config._setupstate.teardown_all, when="teardown") + if call.excinfo: + ntraceback = call.excinfo.traceback .cut(excludepath=py._pydir) + call.excinfo.traceback = ntraceback.filter() + longrepr = call.excinfo.getrepr(funcargs=True) + return TeardownErrorReport(longrepr) + +def pytest_report_teststatus(report): + if report.when in ("setup", "teardown"): + if report.failed: + # category, shortletter, verbose-word + return "error", "E", "ERROR" + elif report.skipped: + return "skipped", "s", "SKIPPED" + else: + return "", "", "" + + +# +# Implementation + +def call_and_report(item, when, log=True): + call = call_runtest_hook(item, when) + hook = item.ihook + report = hook.pytest_runtest_makereport(item=item, call=call) + if log and (when == "call" or not report.passed): + hook.pytest_runtest_logreport(report=report) + return report + +def call_runtest_hook(item, when): + hookname = "pytest_runtest_" + when + ihook = getattr(item.ihook, hookname) + return CallInfo(lambda: ihook(item=item), when=when) + +class CallInfo: + """ Result/Exception info a function invocation. """ + #: None or ExceptionInfo object. + excinfo = None + def __init__(self, func, when): + #: context of invocation: one of "setup", "call", + #: "teardown", "memocollect" + self.when = when + try: + self.result = func() + except KeyboardInterrupt: + raise + except: + self.excinfo = py.code.ExceptionInfo() + + def __repr__(self): + if self.excinfo: + status = "exception: %s" % str(self.excinfo.value) + else: + status = "result: %r" % (self.result,) + return "<CallInfo when=%r %s>" % (self.when, status) + +def getslaveinfoline(node): + try: + return node._slaveinfocache + except AttributeError: + d = node.slaveinfo + ver = "%s.%s.%s" % d['version_info'][:3] + node._slaveinfocache = s = "[%s] %s -- Python %s %s" % ( + d['id'], d['sysplatform'], ver, d['executable']) + return s + +class BaseReport(object): + def toterminal(self, out): + longrepr = self.longrepr + if hasattr(self, 'node'): + out.line(getslaveinfoline(self.node)) + if hasattr(longrepr, 'toterminal'): + longrepr.toterminal(out) + else: + out.line(str(longrepr)) + + passed = property(lambda x: x.outcome == "passed") + failed = property(lambda x: x.outcome == "failed") + skipped = property(lambda x: x.outcome == "skipped") + + @property + def fspath(self): + return self.nodeid.split("::")[0] + +def pytest_runtest_makereport(item, call): + when = call.when + keywords = dict([(x,1) for x in item.keywords]) + excinfo = call.excinfo + if not call.excinfo: + outcome = "passed" + longrepr = None + else: + excinfo = call.excinfo + if not isinstance(excinfo, py.code.ExceptionInfo): + outcome = "failed" + longrepr = excinfo + elif excinfo.errisinstance(py.test.skip.Exception): + outcome = "skipped" + r = item._repr_failure_py(excinfo, "line").reprcrash + longrepr = (str(r.path), r.lineno, r.message) + else: + outcome = "failed" + if call.when == "call": + longrepr = item.repr_failure(excinfo) + else: # exception in setup or teardown + longrepr = item._repr_failure_py(excinfo) + return TestReport(item.nodeid, item.location, + keywords, outcome, longrepr, when) + +class TestReport(BaseReport): + """ Basic test report object (also used for setup and teardown calls if + they fail). + """ + def __init__(self, nodeid, location, + keywords, outcome, longrepr, when): + #: normalized collection node id + self.nodeid = nodeid + + #: a (filesystempath, lineno, domaininfo) tuple indicating the + #: actual location of a test item - it might be different from the + #: collected one e.g. if a method is inherited from a different module. + self.location = location + + #: a name -> value dictionary containing all keywords and + #: markers associated with a test invocation. + self.keywords = keywords + + #: test outcome, always one of "passed", "failed", "skipped". + self.outcome = outcome + + #: None or a failure representation. + self.longrepr = longrepr + + #: one of 'setup', 'call', 'teardown' to indicate runtest phase. + self.when = when + + def __repr__(self): + return "<TestReport %r when=%r outcome=%r>" % ( + self.nodeid, self.when, self.outcome) + +class TeardownErrorReport(BaseReport): + outcome = "failed" + when = "teardown" + def __init__(self, longrepr): + self.longrepr = longrepr + +def pytest_make_collect_report(collector): + call = CallInfo(collector._memocollect, "memocollect") + longrepr = None + if not call.excinfo: + outcome = "passed" + else: + if call.excinfo.errisinstance(py.test.skip.Exception): + outcome = "skipped" + r = collector._repr_failure_py(call.excinfo, "line").reprcrash + longrepr = (str(r.path), r.lineno, r.message) + else: + outcome = "failed" + errorinfo = collector.repr_failure(call.excinfo) + if not hasattr(errorinfo, "toterminal"): + errorinfo = CollectErrorRepr(errorinfo) + longrepr = errorinfo + return CollectReport(collector.nodeid, outcome, longrepr, + getattr(call, 'result', None)) + +class CollectReport(BaseReport): + def __init__(self, nodeid, outcome, longrepr, result): + self.nodeid = nodeid + self.outcome = outcome + self.longrepr = longrepr + self.result = result or [] + + @property + def location(self): + return (self.fspath, None, self.fspath) + + def __repr__(self): + return "<CollectReport %r lenresult=%s outcome=%r>" % ( + self.nodeid, len(self.result), self.outcome) + +class CollectErrorRepr(TerminalRepr): + def __init__(self, msg): + self.longrepr = msg + def toterminal(self, out): + out.line(str(self.longrepr), red=True) + +class SetupState(object): + """ shared state for setting up/tearing down test items or collectors. """ + def __init__(self): + self.stack = [] + self._finalizers = {} + + def addfinalizer(self, finalizer, colitem): + """ attach a finalizer to the given colitem. + if colitem is None, this will add a finalizer that + is called at the end of teardown_all(). + """ + assert hasattr(finalizer, '__call__') + #assert colitem in self.stack + self._finalizers.setdefault(colitem, []).append(finalizer) + + def _pop_and_teardown(self): + colitem = self.stack.pop() + self._teardown_with_finalization(colitem) + + def _callfinalizers(self, colitem): + finalizers = self._finalizers.pop(colitem, None) + while finalizers: + fin = finalizers.pop() + fin() + + def _teardown_with_finalization(self, colitem): + self._callfinalizers(colitem) + if colitem: + colitem.teardown() + for colitem in self._finalizers: + assert colitem is None or colitem in self.stack + + def teardown_all(self): + while self.stack: + self._pop_and_teardown() + self._teardown_with_finalization(None) + assert not self._finalizers + + def teardown_exact(self, item): + if self.stack and item == self.stack[-1]: + self._pop_and_teardown() + else: + self._callfinalizers(item) + + def prepare(self, colitem): + """ setup objects along the collector chain to the test-method + and teardown previously setup objects.""" + needed_collectors = colitem.listchain() + while self.stack: + if self.stack == needed_collectors[:len(self.stack)]: + break + self._pop_and_teardown() + # check if the last collection node has raised an error + for col in self.stack: + if hasattr(col, '_prepare_exc'): + py.builtin._reraise(*col._prepare_exc) + for col in needed_collectors[len(self.stack):]: + self.stack.append(col) + try: + col.setup() + except Exception: + col._prepare_exc = sys.exc_info() + raise + +# ============================================================= +# Test OutcomeExceptions and helpers for creating them. + + +class OutcomeException(Exception): + """ OutcomeException and its subclass instances indicate and + contain info about test and collection outcomes. + """ + def __init__(self, msg=None, pytrace=True): + self.msg = msg + self.pytrace = pytrace + + def __repr__(self): + if self.msg: + return str(self.msg) + return "<%s instance>" %(self.__class__.__name__,) + __str__ = __repr__ + +class Skipped(OutcomeException): + # XXX hackish: on 3k we fake to live in the builtins + # in order to have Skipped exception printing shorter/nicer + __module__ = 'builtins' + +class Failed(OutcomeException): + """ raised from an explicit call to py.test.fail() """ + __module__ = 'builtins' + +class Exit(KeyboardInterrupt): + """ raised for immediate program exits (no tracebacks/summaries)""" + def __init__(self, msg="unknown reason"): + self.msg = msg + KeyboardInterrupt.__init__(self, msg) + +# exposed helper methods + +def exit(msg): + """ exit testing process as if KeyboardInterrupt was triggered. """ + __tracebackhide__ = True + raise Exit(msg) + +exit.Exception = Exit + +def skip(msg=""): + """ skip an executing test with the given message. Note: it's usually + better to use the py.test.mark.skipif marker to declare a test to be + skipped under certain conditions like mismatching platforms or + dependencies. See the pytest_skipping plugin for details. + """ + __tracebackhide__ = True + raise Skipped(msg=msg) +skip.Exception = Skipped + +def fail(msg="", pytrace=True): + """ explicitely fail an currently-executing test with the given Message. + if @pytrace is not True the msg represents the full failure information. + """ + __tracebackhide__ = True + raise Failed(msg=msg, pytrace=pytrace) +fail.Exception = Failed + + +def importorskip(modname, minversion=None): + """ return imported module if it has a higher __version__ than the + optionally specified 'minversion' - otherwise call py.test.skip() + with a message detailing the mismatch. + """ + __tracebackhide__ = True + compile(modname, '', 'eval') # to catch syntaxerrors + try: + mod = __import__(modname, None, None, ['__doc__']) + except ImportError: + py.test.skip("could not import %r" %(modname,)) + if minversion is None: + return mod + verattr = getattr(mod, '__version__', None) + if isinstance(minversion, str): + minver = minversion.split(".") + else: + minver = list(minversion) + if verattr is None or verattr.split(".") < minver: + py.test.skip("module %r has __version__ %r, required is: %r" %( + modname, verattr, minversion)) + return mod diff --git a/_pytest/skipping.py b/_pytest/skipping.py new file mode 100644 index 0000000000..106c8518c3 --- /dev/null +++ b/_pytest/skipping.py @@ -0,0 +1,213 @@ +""" support for skip/xfail functions and markers. """ + +import py, pytest + +def pytest_addoption(parser): + group = parser.getgroup("general") + group.addoption('--runxfail', + action="store_true", dest="runxfail", default=False, + help="run tests even if they are marked xfail") + +def pytest_namespace(): + return dict(xfail=xfail) + +class XFailed(pytest.fail.Exception): + """ raised from an explicit call to py.test.xfail() """ + +def xfail(reason=""): + """ xfail an executing test or setup functions with the given reason.""" + __tracebackhide__ = True + raise XFailed(reason) +xfail.Exception = XFailed + +class MarkEvaluator: + def __init__(self, item, name): + self.item = item + self.name = name + + @property + def holder(self): + return self.item.keywords.get(self.name, None) + def __bool__(self): + return bool(self.holder) + __nonzero__ = __bool__ + + def istrue(self): + if self.holder: + d = {'os': py.std.os, 'sys': py.std.sys, 'config': self.item.config} + if self.holder.args: + self.result = False + for expr in self.holder.args: + self.expr = expr + if isinstance(expr, str): + result = cached_eval(self.item.config, expr, d) + else: + result = expr + if result: + self.result = True + self.expr = expr + break + else: + self.result = True + return getattr(self, 'result', False) + + def get(self, attr, default=None): + return self.holder.kwargs.get(attr, default) + + def getexplanation(self): + expl = self.get('reason', None) + if not expl: + if not hasattr(self, 'expr'): + return "" + else: + return "condition: " + self.expr + return expl + + +def pytest_runtest_setup(item): + if not isinstance(item, pytest.Function): + return + evalskip = MarkEvaluator(item, 'skipif') + if evalskip.istrue(): + py.test.skip(evalskip.getexplanation()) + item._evalxfail = MarkEvaluator(item, 'xfail') + check_xfail_no_run(item) + +def pytest_pyfunc_call(pyfuncitem): + check_xfail_no_run(pyfuncitem) + +def check_xfail_no_run(item): + if not item.config.option.runxfail: + evalxfail = item._evalxfail + if evalxfail.istrue(): + if not evalxfail.get('run', True): + py.test.xfail("[NOTRUN] " + evalxfail.getexplanation()) + +def pytest_runtest_makereport(__multicall__, item, call): + if not isinstance(item, pytest.Function): + return + if not (call.excinfo and + call.excinfo.errisinstance(py.test.xfail.Exception)): + evalxfail = getattr(item, '_evalxfail', None) + if not evalxfail: + return + if call.excinfo and call.excinfo.errisinstance(py.test.xfail.Exception): + if not item.config.getvalue("runxfail"): + rep = __multicall__.execute() + rep.keywords['xfail'] = "reason: " + call.excinfo.value.msg + rep.outcome = "skipped" + return rep + rep = __multicall__.execute() + evalxfail = item._evalxfail + if not item.config.option.runxfail and evalxfail.istrue(): + if call.excinfo: + rep.outcome = "skipped" + rep.keywords['xfail'] = evalxfail.getexplanation() + elif call.when == "call": + rep.outcome = "failed" + rep.keywords['xfail'] = evalxfail.getexplanation() + else: + if 'xfail' in rep.keywords: + del rep.keywords['xfail'] + return rep + +# called by terminalreporter progress reporting +def pytest_report_teststatus(report): + if 'xfail' in report.keywords: + if report.skipped: + return "xfailed", "x", "xfail" + elif report.failed: + return "xpassed", "X", "XPASS" + +# called by the terminalreporter instance/plugin +def pytest_terminal_summary(terminalreporter): + tr = terminalreporter + if not tr.reportchars: + #for name in "xfailed skipped failed xpassed": + # if not tr.stats.get(name, 0): + # tr.write_line("HINT: use '-r' option to see extra " + # "summary info about tests") + # break + return + + lines = [] + for char in tr.reportchars: + if char == "x": + show_xfailed(terminalreporter, lines) + elif char == "X": + show_xpassed(terminalreporter, lines) + elif char in "fF": + show_failed(terminalreporter, lines) + elif char in "sS": + show_skipped(terminalreporter, lines) + if lines: + tr._tw.sep("=", "short test summary info") + for line in lines: + tr._tw.line(line) + +def show_failed(terminalreporter, lines): + tw = terminalreporter._tw + failed = terminalreporter.stats.get("failed") + if failed: + for rep in failed: + pos = rep.nodeid + lines.append("FAIL %s" %(pos, )) + +def show_xfailed(terminalreporter, lines): + xfailed = terminalreporter.stats.get("xfailed") + if xfailed: + for rep in xfailed: + pos = rep.nodeid + reason = rep.keywords['xfail'] + lines.append("XFAIL %s" % (pos,)) + if reason: + lines.append(" " + str(reason)) + +def show_xpassed(terminalreporter, lines): + xpassed = terminalreporter.stats.get("xpassed") + if xpassed: + for rep in xpassed: + pos = rep.nodeid + reason = rep.keywords['xfail'] + lines.append("XPASS %s %s" %(pos, reason)) + +def cached_eval(config, expr, d): + if not hasattr(config, '_evalcache'): + config._evalcache = {} + try: + return config._evalcache[expr] + except KeyError: + #import sys + #print >>sys.stderr, ("cache-miss: %r" % expr) + config._evalcache[expr] = x = eval(expr, d) + return x + + +def folded_skips(skipped): + d = {} + for event in skipped: + key = event.longrepr + assert len(key) == 3, (event, key) + d.setdefault(key, []).append(event) + l = [] + for key, events in d.items(): + l.append((len(events),) + key) + return l + +def show_skipped(terminalreporter, lines): + tr = terminalreporter + skipped = tr.stats.get('skipped', []) + if skipped: + #if not tr.hasopt('skipped'): + # tr.write_line( + # "%d skipped tests, specify -rs for more info" % + # len(skipped)) + # return + fskips = folded_skips(skipped) + if fskips: + #tr.write_sep("_", "skipped test summary") + for num, fspath, lineno, reason in fskips: + if reason.startswith("Skipped: "): + reason = reason[9:] + lines.append("SKIP [%d] %s:%d: %s" % + (num, fspath, lineno, reason)) diff --git a/_pytest/standalonetemplate.py b/_pytest/standalonetemplate.py new file mode 100755 index 0000000000..5aaceaa9ee --- /dev/null +++ b/_pytest/standalonetemplate.py @@ -0,0 +1,63 @@ +#! /usr/bin/env python + +sources = """ +@SOURCES@""" + +import sys +import base64 +import zlib +import imp + +class DictImporter(object): + def __init__(self, sources): + self.sources = sources + + def find_module(self, fullname, path=None): + if fullname in self.sources: + return self + if fullname + '.__init__' in self.sources: + return self + return None + + def load_module(self, fullname): + # print "load_module:", fullname + from types import ModuleType + try: + s = self.sources[fullname] + is_pkg = False + except KeyError: + s = self.sources[fullname + '.__init__'] + is_pkg = True + + co = compile(s, fullname, 'exec') + module = sys.modules.setdefault(fullname, ModuleType(fullname)) + module.__file__ = "%s/%s" % (__file__, fullname) + module.__loader__ = self + if is_pkg: + module.__path__ = [fullname] + + do_exec(co, module.__dict__) + return sys.modules[fullname] + + def get_source(self, name): + res = self.sources.get(name) + if res is None: + res = self.sources.get(name + '.__init__') + return res + +if __name__ == "__main__": + if sys.version_info >= (3, 0): + exec("def do_exec(co, loc): exec(co, loc)\n") + import pickle + sources = sources.encode("ascii") # ensure bytes + sources = pickle.loads(zlib.decompress(base64.decodebytes(sources))) + else: + import cPickle as pickle + exec("def do_exec(co, loc): exec co in loc\n") + sources = pickle.loads(zlib.decompress(base64.decodestring(sources))) + + importer = DictImporter(sources) + sys.meta_path.append(importer) + + entry = "@ENTRY@" + do_exec(entry, locals()) diff --git a/_pytest/terminal.py b/_pytest/terminal.py new file mode 100644 index 0000000000..0b0ab1ee2f --- /dev/null +++ b/_pytest/terminal.py @@ -0,0 +1,467 @@ +""" terminal reporting of the full testing process. + +This is a good source for looking at the various reporting hooks. +""" +import pytest, py +import sys +import os + +def pytest_addoption(parser): + group = parser.getgroup("terminal reporting", "reporting", after="general") + group._addoption('-v', '--verbose', action="count", + dest="verbose", default=0, help="increase verbosity."), + group._addoption('-q', '--quiet', action="count", + dest="quiet", default=0, help="decreate verbosity."), + group._addoption('-r', + action="store", dest="reportchars", default=None, metavar="chars", + help="show extra test summary info as specified by chars (f)ailed, " + "(s)skipped, (x)failed, (X)passed.") + group._addoption('-l', '--showlocals', + action="store_true", dest="showlocals", default=False, + help="show locals in tracebacks (disabled by default).") + group._addoption('--report', + action="store", dest="report", default=None, metavar="opts", + help="(deprecated, use -r)") + group._addoption('--tb', metavar="style", + action="store", dest="tbstyle", default='long', + type="choice", choices=['long', 'short', 'no', 'line', 'native'], + help="traceback print mode (long/short/line/no).") + group._addoption('--fulltrace', + action="store_true", dest="fulltrace", default=False, + help="don't cut any tracebacks (default is to cut).") + +def pytest_configure(config): + config.option.verbose -= config.option.quiet + if config.option.collectonly: + reporter = CollectonlyReporter(config) + else: + # we try hard to make printing resilient against + # later changes on FD level. + stdout = py.std.sys.stdout + if hasattr(os, 'dup') and hasattr(stdout, 'fileno'): + try: + newfd = os.dup(stdout.fileno()) + #print "got newfd", newfd + except ValueError: + pass + else: + stdout = os.fdopen(newfd, stdout.mode, 1) + config._toclose = stdout + reporter = TerminalReporter(config, stdout) + config.pluginmanager.register(reporter, 'terminalreporter') + if config.option.debug or config.option.traceconfig: + def mywriter(tags, args): + msg = " ".join(map(str, args)) + reporter.write_line("[traceconfig] " + msg) + config.trace.root.setprocessor("pytest:config", mywriter) + +def pytest_unconfigure(config): + if hasattr(config, '_toclose'): + #print "closing", config._toclose, config._toclose.fileno() + config._toclose.close() + +def getreportopt(config): + reportopts = "" + optvalue = config.option.report + if optvalue: + py.builtin.print_("DEPRECATED: use -r instead of --report option.", + file=py.std.sys.stderr) + if optvalue: + for setting in optvalue.split(","): + setting = setting.strip() + if setting == "skipped": + reportopts += "s" + elif setting == "xfailed": + reportopts += "x" + reportchars = config.option.reportchars + if reportchars: + for char in reportchars: + if char not in reportopts: + reportopts += char + return reportopts + +def pytest_report_teststatus(report): + if report.passed: + letter = "." + elif report.skipped: + letter = "s" + elif report.failed: + letter = "F" + if report.when != "call": + letter = "f" + return report.outcome, letter, report.outcome.upper() + +class TerminalReporter: + def __init__(self, config, file=None): + self.config = config + self.verbosity = self.config.option.verbose + self.showheader = self.verbosity >= 0 + self.showfspath = self.verbosity >= 0 + self.showlongtestinfo = self.verbosity > 0 + self._numcollected = 0 + + self.stats = {} + self.curdir = py.path.local() + if file is None: + file = py.std.sys.stdout + self._tw = py.io.TerminalWriter(file) + self.currentfspath = None + self.reportchars = getreportopt(config) + self.hasmarkup = self._tw.hasmarkup + + def hasopt(self, char): + char = {'xfailed': 'x', 'skipped': 's'}.get(char,char) + return char in self.reportchars + + def write_fspath_result(self, fspath, res): + if fspath != self.currentfspath: + self.currentfspath = fspath + #fspath = self.curdir.bestrelpath(fspath) + self._tw.line() + #relpath = self.curdir.bestrelpath(fspath) + self._tw.write(fspath + " ") + self._tw.write(res) + + def write_ensure_prefix(self, prefix, extra="", **kwargs): + if self.currentfspath != prefix: + self._tw.line() + self.currentfspath = prefix + self._tw.write(prefix) + if extra: + self._tw.write(extra, **kwargs) + self.currentfspath = -2 + + def ensure_newline(self): + if self.currentfspath: + self._tw.line() + self.currentfspath = None + + def write(self, content, **markup): + self._tw.write(content, **markup) + + def write_line(self, line, **markup): + line = str(line) + self.ensure_newline() + self._tw.line(line, **markup) + + def rewrite(self, line, **markup): + line = str(line) + self._tw.write("\r" + line, **markup) + + def write_sep(self, sep, title=None, **markup): + self.ensure_newline() + self._tw.sep(sep, title, **markup) + + def pytest_internalerror(self, excrepr): + for line in str(excrepr).split("\n"): + self.write_line("INTERNALERROR> " + line) + return 1 + + def pytest_plugin_registered(self, plugin): + if self.config.option.traceconfig: + msg = "PLUGIN registered: %s" %(plugin,) + # XXX this event may happen during setup/teardown time + # which unfortunately captures our output here + # which garbles our output if we use self.write_line + self.write_line(msg) + + def pytest_deselected(self, items): + self.stats.setdefault('deselected', []).extend(items) + + def pytest__teardown_final_logerror(self, report): + self.stats.setdefault("error", []).append(report) + + def pytest_runtest_logstart(self, nodeid, location): + # ensure that the path is printed before the + # 1st test of a module starts running + fspath = nodeid.split("::")[0] + if self.showlongtestinfo: + line = self._locationline(fspath, *location) + self.write_ensure_prefix(line, "") + elif self.showfspath: + self.write_fspath_result(fspath, "") + + def pytest_runtest_logreport(self, report): + rep = report + res = self.config.hook.pytest_report_teststatus(report=rep) + cat, letter, word = res + self.stats.setdefault(cat, []).append(rep) + if not letter and not word: + # probably passed setup/teardown + return + if self.verbosity <= 0: + if not hasattr(rep, 'node') and self.showfspath: + self.write_fspath_result(rep.fspath, letter) + else: + self._tw.write(letter) + else: + if isinstance(word, tuple): + word, markup = word + else: + if rep.passed: + markup = {'green':True} + elif rep.failed: + markup = {'red':True} + elif rep.skipped: + markup = {'yellow':True} + line = self._locationline(str(rep.fspath), *rep.location) + if not hasattr(rep, 'node'): + self.write_ensure_prefix(line, word, **markup) + #self._tw.write(word, **markup) + else: + self.ensure_newline() + if hasattr(rep, 'node'): + self._tw.write("[%s] " % rep.node.gateway.id) + self._tw.write(word, **markup) + self._tw.write(" " + line) + self.currentfspath = -2 + + def pytest_collection(self): + if not self.hasmarkup: + self.write("collecting ... ", bold=True) + + def pytest_collectreport(self, report): + if report.failed: + self.stats.setdefault("error", []).append(report) + elif report.skipped: + self.stats.setdefault("skipped", []).append(report) + items = [x for x in report.result if isinstance(x, pytest.Item)] + self._numcollected += len(items) + if self.hasmarkup: + #self.write_fspath_result(report.fspath, 'E') + self.report_collect() + + def report_collect(self, final=False): + errors = len(self.stats.get('error', [])) + skipped = len(self.stats.get('skipped', [])) + if final: + line = "collected " + else: + line = "collecting " + line += str(self._numcollected) + " items" + if errors: + line += " / %d errors" % errors + if skipped: + line += " / %d skipped" % skipped + if self.hasmarkup: + if final: + line += " \n" + self.rewrite(line, bold=True) + else: + self.write_line(line) + + def pytest_collection_modifyitems(self): + self.report_collect(True) + + def pytest_sessionstart(self, session): + self._sessionstarttime = py.std.time.time() + if not self.showheader: + return + self.write_sep("=", "test session starts", bold=True) + verinfo = ".".join(map(str, sys.version_info[:3])) + msg = "platform %s -- Python %s" % (sys.platform, verinfo) + if hasattr(sys, 'pypy_version_info'): + verinfo = ".".join(map(str, sys.pypy_version_info[:3])) + msg += "[pypy-%s]" % verinfo + msg += " -- pytest-%s" % (py.test.__version__) + if self.verbosity > 0 or self.config.option.debug or \ + getattr(self.config.option, 'pastebin', None): + msg += " -- " + str(sys.executable) + self.write_line(msg) + lines = self.config.hook.pytest_report_header(config=self.config) + lines.reverse() + for line in flatten(lines): + self.write_line(line) + + def pytest_collection_finish(self): + if not self.showheader: + return + #for i, testarg in enumerate(self.config.args): + # self.write_line("test path %d: %s" %(i+1, testarg)) + + def pytest_sessionfinish(self, exitstatus, __multicall__): + __multicall__.execute() + self._tw.line("") + if exitstatus in (0, 1, 2): + self.summary_errors() + self.summary_failures() + self.config.hook.pytest_terminal_summary(terminalreporter=self) + if exitstatus == 2: + self._report_keyboardinterrupt() + self.summary_deselected() + self.summary_stats() + + def pytest_keyboard_interrupt(self, excinfo): + self._keyboardinterrupt_memo = excinfo.getrepr(funcargs=True) + + def _report_keyboardinterrupt(self): + excrepr = self._keyboardinterrupt_memo + msg = excrepr.reprcrash.message + self.write_sep("!", msg) + if "KeyboardInterrupt" in msg: + if self.config.option.fulltrace: + excrepr.toterminal(self._tw) + else: + excrepr.reprcrash.toterminal(self._tw) + + def _locationline(self, collect_fspath, fspath, lineno, domain): + if fspath and fspath != collect_fspath: + fspath = "%s <- %s" % (collect_fspath, fspath) + if lineno is not None: + lineno += 1 + if fspath and lineno and domain: + line = "%(fspath)s:%(lineno)s: %(domain)s" + elif fspath and domain: + line = "%(fspath)s: %(domain)s" + elif fspath and lineno: + line = "%(fspath)s:%(lineno)s %(extrapath)s" + else: + line = "[nolocation]" + return line % locals() + " " + + def _getfailureheadline(self, rep): + if hasattr(rep, 'location'): + fspath, lineno, domain = rep.location + return domain + else: + return "test session" # XXX? + + def _getcrashline(self, rep): + try: + return str(rep.longrepr.reprcrash) + except AttributeError: + try: + return str(rep.longrepr)[:50] + except AttributeError: + return "" + + # + # summaries for sessionfinish + # + def getreports(self, name): + l = [] + for x in self.stats.get(name, []): + if not hasattr(x, '_pdbshown'): + l.append(x) + return l + + def summary_failures(self): + if self.config.option.tbstyle != "no": + reports = self.getreports('failed') + if not reports: + return + self.write_sep("=", "FAILURES") + for rep in reports: + if self.config.option.tbstyle == "line": + line = self._getcrashline(rep) + self.write_line(line) + else: + msg = self._getfailureheadline(rep) + self.write_sep("_", msg) + rep.toterminal(self._tw) + + def summary_errors(self): + if self.config.option.tbstyle != "no": + reports = self.getreports('error') + if not reports: + return + self.write_sep("=", "ERRORS") + for rep in self.stats['error']: + msg = self._getfailureheadline(rep) + if not hasattr(rep, 'when'): + # collect + msg = "ERROR collecting " + msg + elif rep.when == "setup": + msg = "ERROR at setup of " + msg + elif rep.when == "teardown": + msg = "ERROR at teardown of " + msg + self.write_sep("_", msg) + rep.toterminal(self._tw) + + def summary_stats(self): + session_duration = py.std.time.time() - self._sessionstarttime + + keys = "failed passed skipped deselected".split() + for key in self.stats.keys(): + if key not in keys: + keys.append(key) + parts = [] + for key in keys: + val = self.stats.get(key, None) + if val: + parts.append("%d %s" %(len(val), key)) + line = ", ".join(parts) + # XXX coloring + msg = "%s in %.2f seconds" %(line, session_duration) + if self.verbosity >= 0: + self.write_sep("=", msg, bold=True) + else: + self.write_line(msg, bold=True) + + def summary_deselected(self): + if 'deselected' in self.stats: + self.write_sep("=", "%d tests deselected by %r" %( + len(self.stats['deselected']), self.config.option.keyword), bold=True) + + +class CollectonlyReporter: + INDENT = " " + + def __init__(self, config, out=None): + self.config = config + if out is None: + out = py.std.sys.stdout + self._tw = py.io.TerminalWriter(out) + self.indent = "" + self._failed = [] + + def outindent(self, line): + self._tw.line(self.indent + str(line)) + + def pytest_internalerror(self, excrepr): + for line in str(excrepr).split("\n"): + self._tw.line("INTERNALERROR> " + line) + + def pytest_collectstart(self, collector): + if collector.session != collector: + self.outindent(collector) + self.indent += self.INDENT + + def pytest_itemcollected(self, item): + self.outindent(item) + + def pytest_collectreport(self, report): + if not report.passed: + if hasattr(report.longrepr, 'reprcrash'): + msg = report.longrepr.reprcrash.message + else: + # XXX unify (we have CollectErrorRepr here) + msg = str(report.longrepr[2]) + self.outindent("!!! %s !!!" % msg) + #self.outindent("!!! error !!!") + self._failed.append(report) + self.indent = self.indent[:-len(self.INDENT)] + + def pytest_collection_finish(self): + if self._failed: + self._tw.sep("!", "collection failures") + for rep in self._failed: + rep.toterminal(self._tw) + return self._failed and 1 or 0 + +def repr_pythonversion(v=None): + if v is None: + v = sys.version_info + try: + return "%s.%s.%s-%s-%s" % v + except (TypeError, ValueError): + return str(v) + +def flatten(l): + for x in l: + if isinstance(x, (list, tuple)): + for y in flatten(x): + yield y + else: + yield x + diff --git a/_pytest/tmpdir.py b/_pytest/tmpdir.py new file mode 100644 index 0000000000..9eab52d545 --- /dev/null +++ b/_pytest/tmpdir.py @@ -0,0 +1,71 @@ +""" support for providing temporary directories to test functions. """ +import pytest, py +from _pytest.monkeypatch import monkeypatch + +class TempdirHandler: + def __init__(self, config): + self.config = config + self.trace = config.trace.get("tmpdir") + + def ensuretemp(self, string, dir=1): + """ (deprecated) return temporary directory path with + the given string as the trailing part. It is usually + better to use the 'tmpdir' function argument which + provides an empty unique-per-test-invocation directory + and is guaranteed to be empty. + """ + #py.log._apiwarn(">1.1", "use tmpdir function argument") + return self.getbasetemp().ensure(string, dir=dir) + + def mktemp(self, basename, numbered=True): + basetemp = self.getbasetemp() + if not numbered: + p = basetemp.mkdir(basename) + else: + p = py.path.local.make_numbered_dir(prefix=basename, + keep=0, rootdir=basetemp, lock_timeout=None) + self.trace("mktemp", p) + return p + + def getbasetemp(self): + """ return base temporary directory. """ + try: + return self._basetemp + except AttributeError: + basetemp = self.config.option.basetemp + if basetemp: + basetemp = py.path.local(basetemp) + if basetemp.check(): + basetemp.remove() + basetemp.mkdir() + else: + basetemp = py.path.local.make_numbered_dir(prefix='pytest-') + self._basetemp = t = basetemp + self.trace("new basetemp", t) + return t + + def finish(self): + self.trace("finish") + +def pytest_configure(config): + config._mp = mp = monkeypatch() + t = TempdirHandler(config) + mp.setattr(config, '_tmpdirhandler', t, raising=False) + mp.setattr(pytest, 'ensuretemp', t.ensuretemp, raising=False) + +def pytest_unconfigure(config): + config._tmpdirhandler.finish() + config._mp.undo() + +def pytest_funcarg__tmpdir(request): + """return a temporary directory path object + unique to each test function invocation, + created as a sub directory of the base temporary + directory. The returned object is a `py.path.local`_ + path object. + """ + name = request._pyfuncitem.name + name = py.std.re.sub("[\W]", "_", name) + x = request.config._tmpdirhandler.mktemp(name, numbered=True) + return x.realpath() + diff --git a/_pytest/unittest.py b/_pytest/unittest.py new file mode 100644 index 0000000000..5131a93e35 --- /dev/null +++ b/_pytest/unittest.py @@ -0,0 +1,139 @@ +""" discovery and running of std-library "unittest" style tests. """ +import pytest, py +import sys, pdb + +def pytest_pycollect_makeitem(collector, name, obj): + unittest = sys.modules.get('unittest') + if unittest is None: + return # nobody can have derived unittest.TestCase + try: + isunit = issubclass(obj, unittest.TestCase) + except KeyboardInterrupt: + raise + except Exception: + pass + else: + if isunit: + return UnitTestCase(name, parent=collector) + +class UnitTestCase(pytest.Class): + def collect(self): + loader = py.std.unittest.TestLoader() + for name in loader.getTestCaseNames(self.obj): + yield TestCaseFunction(name, parent=self) + + def setup(self): + meth = getattr(self.obj, 'setUpClass', None) + if meth is not None: + meth() + super(UnitTestCase, self).setup() + + def teardown(self): + meth = getattr(self.obj, 'tearDownClass', None) + if meth is not None: + meth() + super(UnitTestCase, self).teardown() + +class TestCaseFunction(pytest.Function): + _excinfo = None + + def __init__(self, name, parent): + super(TestCaseFunction, self).__init__(name, parent) + if hasattr(self._obj, 'todo'): + getattr(self._obj, 'im_func', self._obj).xfail = \ + pytest.mark.xfail(reason=str(self._obj.todo)) + + def setup(self): + self._testcase = self.parent.obj(self.name) + self._obj = getattr(self._testcase, self.name) + if hasattr(self._testcase, 'setup_method'): + self._testcase.setup_method(self._obj) + + def teardown(self): + if hasattr(self._testcase, 'teardown_method'): + self._testcase.teardown_method(self._obj) + + def startTest(self, testcase): + pass + + def _addexcinfo(self, rawexcinfo): + # unwrap potential exception info (see twisted trial support below) + rawexcinfo = getattr(rawexcinfo, '_rawexcinfo', rawexcinfo) + try: + excinfo = py.code.ExceptionInfo(rawexcinfo) + except TypeError: + try: + try: + l = py.std.traceback.format_exception(*rawexcinfo) + l.insert(0, "NOTE: Incompatible Exception Representation, " + "displaying natively:\n\n") + pytest.fail("".join(l), pytrace=False) + except (pytest.fail.Exception, KeyboardInterrupt): + raise + except: + pytest.fail("ERROR: Unknown Incompatible Exception " + "representation:\n%r" %(rawexcinfo,), pytrace=False) + except KeyboardInterrupt: + raise + except pytest.fail.Exception: + excinfo = py.code.ExceptionInfo() + self.__dict__.setdefault('_excinfo', []).append(excinfo) + + def addError(self, testcase, rawexcinfo): + self._addexcinfo(rawexcinfo) + def addFailure(self, testcase, rawexcinfo): + self._addexcinfo(rawexcinfo) + def addSkip(self, testcase, reason): + try: + pytest.skip(reason) + except pytest.skip.Exception: + self._addexcinfo(sys.exc_info()) + def addExpectedFailure(self, testcase, rawexcinfo, reason): + try: + pytest.xfail(str(reason)) + except pytest.xfail.Exception: + self._addexcinfo(sys.exc_info()) + def addUnexpectedSuccess(self, testcase, reason): + pass + def addSuccess(self, testcase): + pass + def stopTest(self, testcase): + pass + def runtest(self): + self._testcase(result=self) + +@pytest.mark.tryfirst +def pytest_runtest_makereport(item, call): + if isinstance(item, TestCaseFunction): + if item._excinfo: + call.excinfo = item._excinfo.pop(0) + del call.result + +# twisted trial support +def pytest_runtest_protocol(item, __multicall__): + if isinstance(item, TestCaseFunction): + if 'twisted.trial.unittest' in sys.modules: + ut = sys.modules['twisted.python.failure'] + Failure__init__ = ut.Failure.__init__.im_func + check_testcase_implements_trial_reporter() + def excstore(self, exc_value=None, exc_type=None, exc_tb=None): + if exc_value is None: + self._rawexcinfo = sys.exc_info() + else: + if exc_type is None: + exc_type = type(exc_value) + self._rawexcinfo = (exc_type, exc_value, exc_tb) + Failure__init__(self, exc_value, exc_type, exc_tb) + ut.Failure.__init__ = excstore + try: + return __multicall__.execute() + finally: + ut.Failure.__init__ = Failure__init__ + +def check_testcase_implements_trial_reporter(done=[]): + if done: + return + from zope.interface import classImplements + from twisted.trial.itrial import IReporter + classImplements(TestCaseFunction, IReporter) + done.append(1) |