aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorholger krekel <holger@merlinux.eu>2011-01-18 14:13:31 +0100
committerholger krekel <holger@merlinux.eu>2011-01-18 14:13:31 +0100
commitcd173a7f26ee3df1e038c131a3270036d7f561d0 (patch)
tree9c7db1502305a2419243f9a8e852c2323f2b1088 /_pytest
parentFix test_compile_framework_vref on 64-bit. (diff)
downloadpypy-cd173a7f26ee3df1e038c131a3270036d7f561d0.tar.gz
pypy-cd173a7f26ee3df1e038c131a3270036d7f561d0.tar.bz2
pypy-cd173a7f26ee3df1e038c131a3270036d7f561d0.zip
remove old py copy, add current pytest and py lib snapshots (from pytest-2.0.1dev and py-1.4.1dev)
and some initial tweeks to conftest.py
Diffstat (limited to '_pytest')
-rw-r--r--_pytest/__init__.py1
-rw-r--r--_pytest/assertion.py168
-rw-r--r--_pytest/capture.py228
-rw-r--r--_pytest/config.py434
-rw-r--r--_pytest/core.py444
-rw-r--r--_pytest/doctest.py87
-rwxr-xr-x_pytest/genscript.py73
-rw-r--r--_pytest/helpconfig.py182
-rw-r--r--_pytest/hookspec.py222
-rw-r--r--_pytest/junitxml.py173
-rw-r--r--_pytest/main.py517
-rw-r--r--_pytest/mark.py176
-rw-r--r--_pytest/monkeypatch.py103
-rw-r--r--_pytest/nose.py47
-rw-r--r--_pytest/pastebin.py63
-rw-r--r--_pytest/pdb.py76
-rw-r--r--_pytest/pytester.py674
-rw-r--r--_pytest/python.py855
-rw-r--r--_pytest/recwarn.py96
-rw-r--r--_pytest/resultlog.py93
-rw-r--r--_pytest/runner.py390
-rw-r--r--_pytest/skipping.py213
-rwxr-xr-x_pytest/standalonetemplate.py63
-rw-r--r--_pytest/terminal.py467
-rw-r--r--_pytest/tmpdir.py71
-rw-r--r--_pytest/unittest.py139
26 files changed, 6055 insertions, 0 deletions
diff --git a/_pytest/__init__.py b/_pytest/__init__.py
new file mode 100644
index 0000000000..792d600548
--- /dev/null
+++ b/_pytest/__init__.py
@@ -0,0 +1 @@
+#
diff --git a/_pytest/assertion.py b/_pytest/assertion.py
new file mode 100644
index 0000000000..8f09e66612
--- /dev/null
+++ b/_pytest/assertion.py
@@ -0,0 +1,168 @@
+"""
+support for presented detailed information in failing assertions.
+"""
+import py
+import sys
+from _pytest.monkeypatch import monkeypatch
+
+def pytest_addoption(parser):
+ group = parser.getgroup("debugconfig")
+ group._addoption('--no-assert', action="store_true", default=False,
+ dest="noassert",
+ help="disable python assert expression reinterpretation."),
+
+def pytest_configure(config):
+ # The _pytesthook attribute on the AssertionError is used by
+ # py._code._assertionnew to detect this plugin was loaded and in
+ # turn call the hooks defined here as part of the
+ # DebugInterpreter.
+ config._monkeypatch = m = monkeypatch()
+ warn_about_missing_assertion()
+ if not config.getvalue("noassert") and not config.getvalue("nomagic"):
+ def callbinrepr(op, left, right):
+ hook_result = config.hook.pytest_assertrepr_compare(
+ config=config, op=op, left=left, right=right)
+ for new_expl in hook_result:
+ if new_expl:
+ return '\n~'.join(new_expl)
+ m.setattr(py.builtin.builtins,
+ 'AssertionError', py.code._AssertionError)
+ m.setattr(py.code, '_reprcompare', callbinrepr)
+
+def pytest_unconfigure(config):
+ config._monkeypatch.undo()
+
+def warn_about_missing_assertion():
+ try:
+ assert False
+ except AssertionError:
+ pass
+ else:
+ sys.stderr.write("WARNING: failing tests may report as passing because "
+ "assertions are turned off! (are you using python -O?)\n")
+
+# Provide basestring in python3
+try:
+ basestring = basestring
+except NameError:
+ basestring = str
+
+
+def pytest_assertrepr_compare(op, left, right):
+ """return specialised explanations for some operators/operands"""
+ width = 80 - 15 - len(op) - 2 # 15 chars indentation, 1 space around op
+ left_repr = py.io.saferepr(left, maxsize=width/2)
+ right_repr = py.io.saferepr(right, maxsize=width-len(left_repr))
+ summary = '%s %s %s' % (left_repr, op, right_repr)
+
+ issequence = lambda x: isinstance(x, (list, tuple))
+ istext = lambda x: isinstance(x, basestring)
+ isdict = lambda x: isinstance(x, dict)
+ isset = lambda x: isinstance(x, set)
+
+ explanation = None
+ try:
+ if op == '==':
+ if istext(left) and istext(right):
+ explanation = _diff_text(left, right)
+ elif issequence(left) and issequence(right):
+ explanation = _compare_eq_sequence(left, right)
+ elif isset(left) and isset(right):
+ explanation = _compare_eq_set(left, right)
+ elif isdict(left) and isdict(right):
+ explanation = _diff_text(py.std.pprint.pformat(left),
+ py.std.pprint.pformat(right))
+ elif op == 'not in':
+ if istext(left) and istext(right):
+ explanation = _notin_text(left, right)
+ except py.builtin._sysex:
+ raise
+ except:
+ excinfo = py.code.ExceptionInfo()
+ explanation = ['(pytest_assertion plugin: representation of '
+ 'details failed. Probably an object has a faulty __repr__.)',
+ str(excinfo)
+ ]
+
+
+ if not explanation:
+ return None
+
+ # Don't include pageloads of data, should be configurable
+ if len(''.join(explanation)) > 80*8:
+ explanation = ['Detailed information too verbose, truncated']
+
+ return [summary] + explanation
+
+
+def _diff_text(left, right):
+ """Return the explanation for the diff between text
+
+ This will skip leading and trailing characters which are
+ identical to keep the diff minimal.
+ """
+ explanation = []
+ i = 0 # just in case left or right has zero length
+ for i in range(min(len(left), len(right))):
+ if left[i] != right[i]:
+ break
+ if i > 42:
+ i -= 10 # Provide some context
+ explanation = ['Skipping %s identical '
+ 'leading characters in diff' % i]
+ left = left[i:]
+ right = right[i:]
+ if len(left) == len(right):
+ for i in range(len(left)):
+ if left[-i] != right[-i]:
+ break
+ if i > 42:
+ i -= 10 # Provide some context
+ explanation += ['Skipping %s identical '
+ 'trailing characters in diff' % i]
+ left = left[:-i]
+ right = right[:-i]
+ explanation += [line.strip('\n')
+ for line in py.std.difflib.ndiff(left.splitlines(),
+ right.splitlines())]
+ return explanation
+
+
+def _compare_eq_sequence(left, right):
+ explanation = []
+ for i in range(min(len(left), len(right))):
+ if left[i] != right[i]:
+ explanation += ['At index %s diff: %r != %r' %
+ (i, left[i], right[i])]
+ break
+ if len(left) > len(right):
+ explanation += ['Left contains more items, '
+ 'first extra item: %s' % py.io.saferepr(left[len(right)],)]
+ elif len(left) < len(right):
+ explanation += ['Right contains more items, '
+ 'first extra item: %s' % py.io.saferepr(right[len(left)],)]
+ return explanation # + _diff_text(py.std.pprint.pformat(left),
+ # py.std.pprint.pformat(right))
+
+
+def _compare_eq_set(left, right):
+ explanation = []
+ diff_left = left - right
+ diff_right = right - left
+ if diff_left:
+ explanation.append('Extra items in the left set:')
+ for item in diff_left:
+ explanation.append(py.io.saferepr(item))
+ if diff_right:
+ explanation.append('Extra items in the right set:')
+ for item in diff_right:
+ explanation.append(py.io.saferepr(item))
+ return explanation
+
+
+def _notin_text(term, text):
+ index = text.find(term)
+ head = text[:index]
+ tail = text[index+len(term):]
+ correct_text = head + tail
+ return _diff_text(correct_text, text)
diff --git a/_pytest/capture.py b/_pytest/capture.py
new file mode 100644
index 0000000000..2da398383d
--- /dev/null
+++ b/_pytest/capture.py
@@ -0,0 +1,228 @@
+""" per-test stdout/stderr capturing mechanisms, ``capsys`` and ``capfd`` function arguments. """
+
+import pytest, py
+import os
+
+def pytest_addoption(parser):
+ group = parser.getgroup("general")
+ group._addoption('--capture', action="store", default=None,
+ metavar="method", type="choice", choices=['fd', 'sys', 'no'],
+ help="per-test capturing method: one of fd (default)|sys|no.")
+ group._addoption('-s', action="store_const", const="no", dest="capture",
+ help="shortcut for --capture=no.")
+
+def addouterr(rep, outerr):
+ repr = getattr(rep, 'longrepr', None)
+ if not hasattr(repr, 'addsection'):
+ return
+ for secname, content in zip(["out", "err"], outerr):
+ if content:
+ repr.addsection("Captured std%s" % secname, content.rstrip())
+
+def pytest_unconfigure(config):
+ # registered in config.py during early conftest.py loading
+ capman = config.pluginmanager.getplugin('capturemanager')
+ while capman._method2capture:
+ name, cap = capman._method2capture.popitem()
+ # XXX logging module may wants to close it itself on process exit
+ # otherwise we could do finalization here and call "reset()".
+ cap.suspend()
+
+class NoCapture:
+ def startall(self):
+ pass
+ def resume(self):
+ pass
+ def reset(self):
+ pass
+ def suspend(self):
+ return "", ""
+
+class CaptureManager:
+ def __init__(self):
+ self._method2capture = {}
+
+ def _maketempfile(self):
+ f = py.std.tempfile.TemporaryFile()
+ newf = py.io.dupfile(f, encoding="UTF-8")
+ f.close()
+ return newf
+
+ def _makestringio(self):
+ return py.io.TextIO()
+
+ def _getcapture(self, method):
+ if method == "fd":
+ return py.io.StdCaptureFD(now=False,
+ out=self._maketempfile(), err=self._maketempfile()
+ )
+ elif method == "sys":
+ return py.io.StdCapture(now=False,
+ out=self._makestringio(), err=self._makestringio()
+ )
+ elif method == "no":
+ return NoCapture()
+ else:
+ raise ValueError("unknown capturing method: %r" % method)
+
+ def _getmethod_preoptionparse(self, args):
+ if '-s' in args or "--capture=no" in args:
+ return "no"
+ elif hasattr(os, 'dup') and '--capture=sys' not in args:
+ return "fd"
+ else:
+ return "sys"
+
+ def _getmethod(self, config, fspath):
+ if config.option.capture:
+ method = config.option.capture
+ else:
+ try:
+ method = config._conftest.rget("option_capture", path=fspath)
+ except KeyError:
+ method = "fd"
+ if method == "fd" and not hasattr(os, 'dup'): # e.g. jython
+ method = "sys"
+ return method
+
+ def resumecapture_item(self, item):
+ method = self._getmethod(item.config, item.fspath)
+ if not hasattr(item, 'outerr'):
+ item.outerr = ('', '') # we accumulate outerr on the item
+ return self.resumecapture(method)
+
+ def resumecapture(self, method):
+ if hasattr(self, '_capturing'):
+ raise ValueError("cannot resume, already capturing with %r" %
+ (self._capturing,))
+ cap = self._method2capture.get(method)
+ self._capturing = method
+ if cap is None:
+ self._method2capture[method] = cap = self._getcapture(method)
+ cap.startall()
+ else:
+ cap.resume()
+
+ def suspendcapture(self, item=None):
+ self.deactivate_funcargs()
+ if hasattr(self, '_capturing'):
+ method = self._capturing
+ cap = self._method2capture.get(method)
+ if cap is not None:
+ outerr = cap.suspend()
+ del self._capturing
+ if item:
+ outerr = (item.outerr[0] + outerr[0],
+ item.outerr[1] + outerr[1])
+ return outerr
+ if hasattr(item, 'outerr'):
+ return item.outerr
+ return "", ""
+
+ def activate_funcargs(self, pyfuncitem):
+ if not hasattr(pyfuncitem, 'funcargs'):
+ return
+ assert not hasattr(self, '_capturing_funcargs')
+ self._capturing_funcargs = capturing_funcargs = []
+ for name, capfuncarg in pyfuncitem.funcargs.items():
+ if name in ('capsys', 'capfd'):
+ capturing_funcargs.append(capfuncarg)
+ capfuncarg._start()
+
+ def deactivate_funcargs(self):
+ capturing_funcargs = getattr(self, '_capturing_funcargs', None)
+ if capturing_funcargs is not None:
+ while capturing_funcargs:
+ capfuncarg = capturing_funcargs.pop()
+ capfuncarg._finalize()
+ del self._capturing_funcargs
+
+ def pytest_make_collect_report(self, __multicall__, collector):
+ method = self._getmethod(collector.config, collector.fspath)
+ try:
+ self.resumecapture(method)
+ except ValueError:
+ return # recursive collect, XXX refactor capturing
+ # to allow for more lightweight recursive capturing
+ try:
+ rep = __multicall__.execute()
+ finally:
+ outerr = self.suspendcapture()
+ addouterr(rep, outerr)
+ return rep
+
+ @pytest.mark.tryfirst
+ def pytest_runtest_setup(self, item):
+ self.resumecapture_item(item)
+
+ @pytest.mark.tryfirst
+ def pytest_runtest_call(self, item):
+ self.resumecapture_item(item)
+ self.activate_funcargs(item)
+
+ @pytest.mark.tryfirst
+ def pytest_runtest_teardown(self, item):
+ self.resumecapture_item(item)
+
+ def pytest__teardown_final(self, __multicall__, session):
+ method = self._getmethod(session.config, None)
+ self.resumecapture(method)
+ try:
+ rep = __multicall__.execute()
+ finally:
+ outerr = self.suspendcapture()
+ if rep:
+ addouterr(rep, outerr)
+ return rep
+
+ def pytest_keyboard_interrupt(self, excinfo):
+ if hasattr(self, '_capturing'):
+ self.suspendcapture()
+
+ @pytest.mark.tryfirst
+ def pytest_runtest_makereport(self, __multicall__, item, call):
+ self.deactivate_funcargs()
+ rep = __multicall__.execute()
+ outerr = self.suspendcapture(item)
+ if not rep.passed:
+ addouterr(rep, outerr)
+ if not rep.passed or rep.when == "teardown":
+ outerr = ('', '')
+ item.outerr = outerr
+ return rep
+
+def pytest_funcarg__capsys(request):
+ """captures writes to sys.stdout/sys.stderr and makes
+ them available successively via a ``capsys.readouterr()`` method
+ which returns a ``(out, err)`` tuple of captured snapshot strings.
+ """
+ return CaptureFuncarg(py.io.StdCapture)
+
+def pytest_funcarg__capfd(request):
+ """captures writes to file descriptors 1 and 2 and makes
+ snapshotted ``(out, err)`` string tuples available
+ via the ``capsys.readouterr()`` method. If the underlying
+ platform does not have ``os.dup`` (e.g. Jython) tests using
+ this funcarg will automatically skip.
+ """
+ if not hasattr(os, 'dup'):
+ py.test.skip("capfd funcarg needs os.dup")
+ return CaptureFuncarg(py.io.StdCaptureFD)
+
+class CaptureFuncarg:
+ def __init__(self, captureclass):
+ self.capture = captureclass(now=False)
+
+ def _start(self):
+ self.capture.startall()
+
+ def _finalize(self):
+ if hasattr(self, 'capture'):
+ self.capture.reset()
+ del self.capture
+
+ def readouterr(self):
+ return self.capture.readouterr()
+
+ def close(self):
+ self._finalize()
diff --git a/_pytest/config.py b/_pytest/config.py
new file mode 100644
index 0000000000..0c6c2e1628
--- /dev/null
+++ b/_pytest/config.py
@@ -0,0 +1,434 @@
+""" command line options, ini-file and conftest.py processing. """
+
+import py
+import sys, os
+from _pytest.core import PluginManager
+import pytest
+
+def pytest_cmdline_parse(pluginmanager, args):
+ config = Config(pluginmanager)
+ config.parse(args)
+ if config.option.debug:
+ config.trace.root.setwriter(sys.stderr.write)
+ return config
+
+class Parser:
+ """ Parser for command line arguments. """
+
+ def __init__(self, usage=None, processopt=None):
+ self._anonymous = OptionGroup("custom options", parser=self)
+ self._groups = []
+ self._processopt = processopt
+ self._usage = usage
+ self._inidict = {}
+ self._ininames = []
+ self.hints = []
+
+ def processoption(self, option):
+ if self._processopt:
+ if option.dest:
+ self._processopt(option)
+
+ def addnote(self, note):
+ self._notes.append(note)
+
+ def getgroup(self, name, description="", after=None):
+ """ get (or create) a named option Group.
+
+ :name: unique name of the option group.
+ :description: long description for --help output.
+ :after: name of other group, used for ordering --help output.
+ """
+ for group in self._groups:
+ if group.name == name:
+ return group
+ group = OptionGroup(name, description, parser=self)
+ i = 0
+ for i, grp in enumerate(self._groups):
+ if grp.name == after:
+ break
+ self._groups.insert(i+1, group)
+ return group
+
+ def addoption(self, *opts, **attrs):
+ """ add an optparse-style option. """
+ self._anonymous.addoption(*opts, **attrs)
+
+ def parse(self, args):
+ self.optparser = optparser = MyOptionParser(self)
+ groups = self._groups + [self._anonymous]
+ for group in groups:
+ if group.options:
+ desc = group.description or group.name
+ optgroup = py.std.optparse.OptionGroup(optparser, desc)
+ optgroup.add_options(group.options)
+ optparser.add_option_group(optgroup)
+ return self.optparser.parse_args([str(x) for x in args])
+
+ def parse_setoption(self, args, option):
+ parsedoption, args = self.parse(args)
+ for name, value in parsedoption.__dict__.items():
+ setattr(option, name, value)
+ return args
+
+ def addini(self, name, help, type=None, default=None):
+ """ add an ini-file option with the given name and description. """
+ assert type in (None, "pathlist", "args", "linelist")
+ self._inidict[name] = (help, type, default)
+ self._ininames.append(name)
+
+class OptionGroup:
+ def __init__(self, name, description="", parser=None):
+ self.name = name
+ self.description = description
+ self.options = []
+ self.parser = parser
+
+ def addoption(self, *optnames, **attrs):
+ """ add an option to this group. """
+ option = py.std.optparse.Option(*optnames, **attrs)
+ self._addoption_instance(option, shortupper=False)
+
+ def _addoption(self, *optnames, **attrs):
+ option = py.std.optparse.Option(*optnames, **attrs)
+ self._addoption_instance(option, shortupper=True)
+
+ def _addoption_instance(self, option, shortupper=False):
+ if not shortupper:
+ for opt in option._short_opts:
+ if opt[0] == '-' and opt[1].islower():
+ raise ValueError("lowercase shortoptions reserved")
+ if self.parser:
+ self.parser.processoption(option)
+ self.options.append(option)
+
+
+class MyOptionParser(py.std.optparse.OptionParser):
+ def __init__(self, parser):
+ self._parser = parser
+ py.std.optparse.OptionParser.__init__(self, usage=parser._usage,
+ add_help_option=False)
+ def format_epilog(self, formatter):
+ hints = self._parser.hints
+ if hints:
+ s = "\n".join(["hint: " + x for x in hints]) + "\n"
+ s = "\n" + s + "\n"
+ return s
+ return ""
+
+class Conftest(object):
+ """ the single place for accessing values and interacting
+ towards conftest modules from py.test objects.
+ """
+ def __init__(self, onimport=None, confcutdir=None):
+ self._path2confmods = {}
+ self._onimport = onimport
+ self._conftestpath2mod = {}
+ self._confcutdir = confcutdir
+
+ def setinitial(self, args):
+ """ try to find a first anchor path for looking up global values
+ from conftests. This function is usually called _before_
+ argument parsing. conftest files may add command line options
+ and we thus have no completely safe way of determining
+ which parts of the arguments are actually related to options
+ and which are file system paths. We just try here to get
+ bootstrapped ...
+ """
+ current = py.path.local()
+ opt = '--confcutdir'
+ for i in range(len(args)):
+ opt1 = str(args[i])
+ if opt1.startswith(opt):
+ if opt1 == opt:
+ if len(args) > i:
+ p = current.join(args[i+1], abs=True)
+ elif opt1.startswith(opt + "="):
+ p = current.join(opt1[len(opt)+1:], abs=1)
+ self._confcutdir = p
+ break
+ for arg in args + [current]:
+ if hasattr(arg, 'startswith') and arg.startswith("--"):
+ continue
+ anchor = current.join(arg, abs=1)
+ if anchor.check(): # we found some file object
+ self._path2confmods[None] = self.getconftestmodules(anchor)
+ # let's also consider test* dirs
+ if anchor.check(dir=1):
+ for x in anchor.listdir("test*"):
+ if x.check(dir=1):
+ self.getconftestmodules(x)
+ break
+ else:
+ assert 0, "no root of filesystem?"
+
+ def getconftestmodules(self, path):
+ """ return a list of imported conftest modules for the given path. """
+ try:
+ clist = self._path2confmods[path]
+ except KeyError:
+ if path is None:
+ raise ValueError("missing default confest.")
+ dp = path.dirpath()
+ clist = []
+ if dp != path:
+ cutdir = self._confcutdir
+ if cutdir and path != cutdir and not path.relto(cutdir):
+ pass
+ else:
+ conftestpath = path.join("conftest.py")
+ if conftestpath.check(file=1):
+ clist.append(self.importconftest(conftestpath))
+ clist[:0] = self.getconftestmodules(dp)
+ self._path2confmods[path] = clist
+ # be defensive: avoid changes from caller side to
+ # affect us by always returning a copy of the actual list
+ return clist[:]
+
+ def rget(self, name, path=None):
+ mod, value = self.rget_with_confmod(name, path)
+ return value
+
+ def rget_with_confmod(self, name, path=None):
+ modules = self.getconftestmodules(path)
+ modules.reverse()
+ for mod in modules:
+ try:
+ return mod, getattr(mod, name)
+ except AttributeError:
+ continue
+ raise KeyError(name)
+
+ def importconftest(self, conftestpath):
+ assert conftestpath.check(), conftestpath
+ try:
+ return self._conftestpath2mod[conftestpath]
+ except KeyError:
+ pkgpath = conftestpath.pypkgpath()
+ if pkgpath is None:
+ _ensure_removed_sysmodule(conftestpath.purebasename)
+ self._conftestpath2mod[conftestpath] = mod = conftestpath.pyimport()
+ dirpath = conftestpath.dirpath()
+ if dirpath in self._path2confmods:
+ for path, mods in self._path2confmods.items():
+ if path and path.relto(dirpath) or path == dirpath:
+ assert mod not in mods
+ mods.append(mod)
+ self._postimport(mod)
+ return mod
+
+ def _postimport(self, mod):
+ if self._onimport:
+ self._onimport(mod)
+ return mod
+
+def _ensure_removed_sysmodule(modname):
+ try:
+ del sys.modules[modname]
+ except KeyError:
+ pass
+
+class CmdOptions(object):
+ """ holds cmdline options as attributes."""
+ def __init__(self, **kwargs):
+ self.__dict__.update(kwargs)
+ def __repr__(self):
+ return "<CmdOptions %r>" %(self.__dict__,)
+
+class Config(object):
+ """ access to configuration values, pluginmanager and plugin hooks. """
+ def __init__(self, pluginmanager=None):
+ #: command line option values, usually added via parser.addoption(...)
+ #: or parser.getgroup(...).addoption(...) calls
+ self.option = CmdOptions()
+ self._parser = Parser(
+ usage="usage: %prog [options] [file_or_dir] [file_or_dir] [...]",
+ processopt=self._processopt,
+ )
+ #: a pluginmanager instance
+ self.pluginmanager = pluginmanager or PluginManager(load=True)
+ self.trace = self.pluginmanager.trace.root.get("config")
+ self._conftest = Conftest(onimport=self._onimportconftest)
+ self.hook = self.pluginmanager.hook
+ self._inicache = {}
+
+ def _onimportconftest(self, conftestmodule):
+ self.trace("loaded conftestmodule %r" %(conftestmodule,))
+ self.pluginmanager.consider_conftest(conftestmodule)
+
+ def _processopt(self, opt):
+ if hasattr(opt, 'default') and opt.dest:
+ if not hasattr(self.option, opt.dest):
+ setattr(self.option, opt.dest, opt.default)
+
+ def _getmatchingplugins(self, fspath):
+ allconftests = self._conftest._conftestpath2mod.values()
+ plugins = [x for x in self.pluginmanager.getplugins()
+ if x not in allconftests]
+ plugins += self._conftest.getconftestmodules(fspath)
+ return plugins
+
+ def _setinitialconftest(self, args):
+ # capture output during conftest init (#issue93)
+ from _pytest.capture import CaptureManager
+ capman = CaptureManager()
+ self.pluginmanager.register(capman, 'capturemanager')
+ # will be unregistered in capture.py's unconfigure()
+ capman.resumecapture(capman._getmethod_preoptionparse(args))
+ try:
+ try:
+ self._conftest.setinitial(args)
+ finally:
+ out, err = capman.suspendcapture() # logging might have got it
+ except:
+ sys.stdout.write(out)
+ sys.stderr.write(err)
+ raise
+
+ def _initini(self, args):
+ self.inicfg = getcfg(args, ["pytest.ini", "tox.ini", "setup.cfg"])
+ self._parser.addini('addopts', 'extra command line options', 'args')
+ self._parser.addini('minversion', 'minimally required pytest version')
+
+ def _preparse(self, args, addopts=True):
+ self._initini(args)
+ if addopts:
+ args[:] = self.getini("addopts") + args
+ self._checkversion()
+ self.pluginmanager.consider_preparse(args)
+ self.pluginmanager.consider_setuptools_entrypoints()
+ self.pluginmanager.consider_env()
+ self._setinitialconftest(args)
+ self.pluginmanager.do_addoption(self._parser)
+ if addopts:
+ self.hook.pytest_cmdline_preparse(config=self, args=args)
+
+ def _checkversion(self):
+ minver = self.inicfg.get('minversion', None)
+ if minver:
+ ver = minver.split(".")
+ myver = pytest.__version__.split(".")
+ if myver < ver:
+ raise pytest.UsageError(
+ "%s:%d: requires pytest-%s, actual pytest-%s'" %(
+ self.inicfg.config.path, self.inicfg.lineof('minversion'),
+ minver, pytest.__version__))
+
+ def parse(self, args):
+ # parse given cmdline arguments into this config object.
+ # Note that this can only be called once per testing process.
+ assert not hasattr(self, 'args'), (
+ "can only parse cmdline args at most once per Config object")
+ self._preparse(args)
+ self._parser.hints.extend(self.pluginmanager._hints)
+ args = self._parser.parse_setoption(args, self.option)
+ if not args:
+ args.append(py.std.os.getcwd())
+ self.args = args
+
+ def getini(self, name):
+ """ return configuration value from an ini file. If the
+ specified name hasn't been registered through a prior ``parse.addini``
+ call (usually from a plugin), a ValueError is raised. """
+ try:
+ return self._inicache[name]
+ except KeyError:
+ self._inicache[name] = val = self._getini(name)
+ return val
+
+ def _getini(self, name):
+ try:
+ description, type, default = self._parser._inidict[name]
+ except KeyError:
+ raise ValueError("unknown configuration value: %r" %(name,))
+ try:
+ value = self.inicfg[name]
+ except KeyError:
+ if default is not None:
+ return default
+ if type is None:
+ return ''
+ return []
+ if type == "pathlist":
+ dp = py.path.local(self.inicfg.config.path).dirpath()
+ l = []
+ for relpath in py.std.shlex.split(value):
+ l.append(dp.join(relpath, abs=True))
+ return l
+ elif type == "args":
+ return py.std.shlex.split(value)
+ elif type == "linelist":
+ return [t for t in map(lambda x: x.strip(), value.split("\n")) if t]
+ else:
+ assert type is None
+ return value
+
+ def _getconftest_pathlist(self, name, path=None):
+ try:
+ mod, relroots = self._conftest.rget_with_confmod(name, path)
+ except KeyError:
+ return None
+ modpath = py.path.local(mod.__file__).dirpath()
+ l = []
+ for relroot in relroots:
+ if not isinstance(relroot, py.path.local):
+ relroot = relroot.replace("/", py.path.local.sep)
+ relroot = modpath.join(relroot, abs=True)
+ l.append(relroot)
+ return l
+
+ def _getconftest(self, name, path=None, check=False):
+ if check:
+ self._checkconftest(name)
+ return self._conftest.rget(name, path)
+
+ def getvalue(self, name, path=None):
+ """ return ``name`` value looked set from command line options.
+
+ (deprecated) if we can't find the option also lookup
+ the name in a matching conftest file.
+ """
+ try:
+ return getattr(self.option, name)
+ except AttributeError:
+ return self._getconftest(name, path, check=False)
+
+ def getvalueorskip(self, name, path=None):
+ """ (deprecated) return getvalue(name) or call
+ py.test.skip if no value exists. """
+ __tracebackhide__ = True
+ try:
+ val = self.getvalue(name, path)
+ if val is None:
+ raise KeyError(name)
+ return val
+ except KeyError:
+ py.test.skip("no %r value found" %(name,))
+
+
+def getcfg(args, inibasenames):
+ args = [x for x in args if str(x)[0] != "-"]
+ if not args:
+ args = [py.path.local()]
+ for arg in args:
+ arg = py.path.local(arg)
+ for base in arg.parts(reverse=True):
+ for inibasename in inibasenames:
+ p = base.join(inibasename)
+ if p.check():
+ iniconfig = py.iniconfig.IniConfig(p)
+ if 'pytest' in iniconfig.sections:
+ return iniconfig['pytest']
+ return {}
+
+def findupwards(current, basename):
+ current = py.path.local(current)
+ while 1:
+ p = current.join(basename)
+ if p.check():
+ return p
+ p = current.dirpath()
+ if p == current:
+ return
+ current = p
+
diff --git a/_pytest/core.py b/_pytest/core.py
new file mode 100644
index 0000000000..7a67e7cbdf
--- /dev/null
+++ b/_pytest/core.py
@@ -0,0 +1,444 @@
+"""
+pytest PluginManager, basic initialization and tracing.
+(c) Holger Krekel 2004-2010
+"""
+import sys, os
+import inspect
+import py
+from _pytest import hookspec # the extension point definitions
+
+assert py.__version__.split(".")[:2] >= ['1', '4'], ("installation problem: "
+ "%s is too old, remove or upgrade 'py'" % (py.__version__))
+
+default_plugins = (
+ "config mark main terminal runner python pdb unittest capture skipping "
+ "tmpdir monkeypatch recwarn pastebin helpconfig nose assertion genscript "
+ "junitxml resultlog doctest").split()
+
+class TagTracer:
+ def __init__(self, prefix="[pytest] "):
+ self._tag2proc = {}
+ self.writer = None
+ self.indent = 0
+ self.prefix = prefix
+
+ def get(self, name):
+ return TagTracerSub(self, (name,))
+
+ def processmessage(self, tags, args):
+ if self.writer is not None:
+ if args:
+ indent = " " * self.indent
+ content = " ".join(map(str, args))
+ self.writer("%s%s%s\n" %(self.prefix, indent, content))
+ try:
+ self._tag2proc[tags](tags, args)
+ except KeyError:
+ pass
+
+ def setwriter(self, writer):
+ self.writer = writer
+
+ def setprocessor(self, tags, processor):
+ if isinstance(tags, str):
+ tags = tuple(tags.split(":"))
+ else:
+ assert isinstance(tags, tuple)
+ self._tag2proc[tags] = processor
+
+class TagTracerSub:
+ def __init__(self, root, tags):
+ self.root = root
+ self.tags = tags
+ def __call__(self, *args):
+ self.root.processmessage(self.tags, args)
+ def setmyprocessor(self, processor):
+ self.root.setprocessor(self.tags, processor)
+ def get(self, name):
+ return self.__class__(self.root, self.tags + (name,))
+
+class PluginManager(object):
+ def __init__(self, load=False):
+ self._name2plugin = {}
+ self._plugins = []
+ self._hints = []
+ self.trace = TagTracer().get("pluginmanage")
+ self._plugin_distinfo = []
+ if os.environ.get('PYTEST_DEBUG'):
+ err = sys.stderr
+ encoding = getattr(err, 'encoding', 'utf8')
+ try:
+ err = py.io.dupfile(err, encoding=encoding)
+ except Exception:
+ pass
+ self.trace.root.setwriter(err.write)
+ self.hook = HookRelay([hookspec], pm=self)
+ self.register(self)
+ if load:
+ for spec in default_plugins:
+ self.import_plugin(spec)
+
+ def register(self, plugin, name=None, prepend=False):
+ assert not self.isregistered(plugin), plugin
+ name = name or getattr(plugin, '__name__', str(id(plugin)))
+ if name in self._name2plugin:
+ return False
+ #self.trace("registering", name, plugin)
+ self._name2plugin[name] = plugin
+ self.call_plugin(plugin, "pytest_addhooks", {'pluginmanager': self})
+ self.hook.pytest_plugin_registered(manager=self, plugin=plugin)
+ if not prepend:
+ self._plugins.append(plugin)
+ else:
+ self._plugins.insert(0, plugin)
+ return True
+
+ def unregister(self, plugin=None, name=None):
+ if plugin is None:
+ plugin = self.getplugin(name=name)
+ self._plugins.remove(plugin)
+ self.hook.pytest_plugin_unregistered(plugin=plugin)
+ for name, value in list(self._name2plugin.items()):
+ if value == plugin:
+ del self._name2plugin[name]
+
+ def isregistered(self, plugin, name=None):
+ if self.getplugin(name) is not None:
+ return True
+ for val in self._name2plugin.values():
+ if plugin == val:
+ return True
+
+ def addhooks(self, spec):
+ self.hook._addhooks(spec, prefix="pytest_")
+
+ def getplugins(self):
+ return list(self._plugins)
+
+ def skipifmissing(self, name):
+ if not self.hasplugin(name):
+ py.test.skip("plugin %r is missing" % name)
+
+ def hasplugin(self, name):
+ return bool(self.getplugin(name))
+
+ def getplugin(self, name):
+ if name is None:
+ return None
+ try:
+ return self._name2plugin[name]
+ except KeyError:
+ return self._name2plugin.get("_pytest." + name, None)
+
+ # API for bootstrapping
+ #
+ def _envlist(self, varname):
+ val = py.std.os.environ.get(varname, None)
+ if val is not None:
+ return val.split(',')
+ return ()
+
+ def consider_env(self):
+ for spec in self._envlist("PYTEST_PLUGINS"):
+ self.import_plugin(spec)
+
+ def consider_setuptools_entrypoints(self):
+ try:
+ from pkg_resources import iter_entry_points, DistributionNotFound
+ except ImportError:
+ return # XXX issue a warning
+ for ep in iter_entry_points('pytest11'):
+ name = ep.name
+ if name.startswith("pytest_"):
+ name = name[7:]
+ if ep.name in self._name2plugin or name in self._name2plugin:
+ continue
+ try:
+ plugin = ep.load()
+ except DistributionNotFound:
+ continue
+ self._plugin_distinfo.append((ep.dist, plugin))
+ self.register(plugin, name=name)
+
+ def consider_preparse(self, args):
+ for opt1,opt2 in zip(args, args[1:]):
+ if opt1 == "-p":
+ if opt2.startswith("no:"):
+ name = opt2[3:]
+ if self.getplugin(name) is not None:
+ self.unregister(None, name=name)
+ self._name2plugin[name] = -1
+ else:
+ if self.getplugin(opt2) is None:
+ self.import_plugin(opt2)
+
+ def consider_conftest(self, conftestmodule):
+ if self.register(conftestmodule, name=conftestmodule.__file__):
+ self.consider_module(conftestmodule)
+
+ def consider_module(self, mod):
+ attr = getattr(mod, "pytest_plugins", ())
+ if attr:
+ if not isinstance(attr, (list, tuple)):
+ attr = (attr,)
+ for spec in attr:
+ self.import_plugin(spec)
+
+ def import_plugin(self, modname):
+ assert isinstance(modname, str)
+ if self.getplugin(modname) is not None:
+ return
+ try:
+ #self.trace("importing", modname)
+ mod = importplugin(modname)
+ except KeyboardInterrupt:
+ raise
+ except ImportError:
+ if modname.startswith("pytest_"):
+ return self.import_plugin(modname[7:])
+ raise
+ except:
+ e = py.std.sys.exc_info()[1]
+ if not hasattr(py.test, 'skip'):
+ raise
+ elif not isinstance(e, py.test.skip.Exception):
+ raise
+ self._hints.append("skipped plugin %r: %s" %((modname, e.msg)))
+ else:
+ self.register(mod, modname)
+ self.consider_module(mod)
+
+ def pytest_plugin_registered(self, plugin):
+ import pytest
+ dic = self.call_plugin(plugin, "pytest_namespace", {}) or {}
+ if dic:
+ self._setns(pytest, dic)
+ if hasattr(self, '_config'):
+ self.call_plugin(plugin, "pytest_addoption",
+ {'parser': self._config._parser})
+ self.call_plugin(plugin, "pytest_configure",
+ {'config': self._config})
+
+ def _setns(self, obj, dic):
+ import pytest
+ for name, value in dic.items():
+ if isinstance(value, dict):
+ mod = getattr(obj, name, None)
+ if mod is None:
+ modname = "pytest.%s" % name
+ mod = py.std.types.ModuleType(modname)
+ sys.modules[modname] = mod
+ mod.__all__ = []
+ setattr(obj, name, mod)
+ obj.__all__.append(name)
+ self._setns(mod, value)
+ else:
+ setattr(obj, name, value)
+ obj.__all__.append(name)
+ #if obj != pytest:
+ # pytest.__all__.append(name)
+ setattr(pytest, name, value)
+
+ def pytest_terminal_summary(self, terminalreporter):
+ tw = terminalreporter._tw
+ if terminalreporter.config.option.traceconfig:
+ for hint in self._hints:
+ tw.line("hint: %s" % hint)
+
+ def do_addoption(self, parser):
+ mname = "pytest_addoption"
+ methods = reversed(self.listattr(mname))
+ MultiCall(methods, {'parser': parser}).execute()
+
+ def do_configure(self, config):
+ assert not hasattr(self, '_config')
+ self._config = config
+ config.hook.pytest_configure(config=self._config)
+
+ def do_unconfigure(self, config):
+ config = self._config
+ del self._config
+ config.hook.pytest_unconfigure(config=config)
+ config.pluginmanager.unregister(self)
+
+ def notify_exception(self, excinfo):
+ excrepr = excinfo.getrepr(funcargs=True, showlocals=True)
+ res = self.hook.pytest_internalerror(excrepr=excrepr)
+ if not py.builtin.any(res):
+ for line in str(excrepr).split("\n"):
+ sys.stderr.write("INTERNALERROR> %s\n" %line)
+ sys.stderr.flush()
+
+ def listattr(self, attrname, plugins=None):
+ if plugins is None:
+ plugins = self._plugins
+ l = []
+ last = []
+ for plugin in plugins:
+ try:
+ meth = getattr(plugin, attrname)
+ if hasattr(meth, 'tryfirst'):
+ last.append(meth)
+ elif hasattr(meth, 'trylast'):
+ l.insert(0, meth)
+ else:
+ l.append(meth)
+ except AttributeError:
+ continue
+ l.extend(last)
+ return l
+
+ def call_plugin(self, plugin, methname, kwargs):
+ return MultiCall(methods=self.listattr(methname, plugins=[plugin]),
+ kwargs=kwargs, firstresult=True).execute()
+
+
+def importplugin(importspec):
+ name = importspec
+ try:
+ mod = "_pytest." + name
+ return __import__(mod, None, None, '__doc__')
+ except ImportError:
+ #e = py.std.sys.exc_info()[1]
+ #if str(e).find(name) == -1:
+ # raise
+ pass #
+ return __import__(importspec, None, None, '__doc__')
+
+class MultiCall:
+ """ execute a call into multiple python functions/methods. """
+ def __init__(self, methods, kwargs, firstresult=False):
+ self.methods = list(methods)
+ self.kwargs = kwargs
+ self.results = []
+ self.firstresult = firstresult
+
+ def __repr__(self):
+ status = "%d results, %d meths" % (len(self.results), len(self.methods))
+ return "<MultiCall %s, kwargs=%r>" %(status, self.kwargs)
+
+ def execute(self):
+ while self.methods:
+ method = self.methods.pop()
+ kwargs = self.getkwargs(method)
+ res = method(**kwargs)
+ if res is not None:
+ self.results.append(res)
+ if self.firstresult:
+ return res
+ if not self.firstresult:
+ return self.results
+
+ def getkwargs(self, method):
+ kwargs = {}
+ for argname in varnames(method):
+ try:
+ kwargs[argname] = self.kwargs[argname]
+ except KeyError:
+ if argname == "__multicall__":
+ kwargs[argname] = self
+ return kwargs
+
+def varnames(func):
+ if not inspect.isfunction(func) and not inspect.ismethod(func):
+ func = getattr(func, '__call__', func)
+ ismethod = inspect.ismethod(func)
+ rawcode = py.code.getrawcode(func)
+ try:
+ return rawcode.co_varnames[ismethod:rawcode.co_argcount]
+ except AttributeError:
+ return ()
+
+class HookRelay:
+ def __init__(self, hookspecs, pm, prefix="pytest_"):
+ if not isinstance(hookspecs, list):
+ hookspecs = [hookspecs]
+ self._hookspecs = []
+ self._pm = pm
+ self.trace = pm.trace.root.get("hook")
+ for hookspec in hookspecs:
+ self._addhooks(hookspec, prefix)
+
+ def _addhooks(self, hookspecs, prefix):
+ self._hookspecs.append(hookspecs)
+ added = False
+ for name, method in vars(hookspecs).items():
+ if name.startswith(prefix):
+ firstresult = getattr(method, 'firstresult', False)
+ hc = HookCaller(self, name, firstresult=firstresult)
+ setattr(self, name, hc)
+ added = True
+ #print ("setting new hook", name)
+ if not added:
+ raise ValueError("did not find new %r hooks in %r" %(
+ prefix, hookspecs,))
+
+
+class HookCaller:
+ def __init__(self, hookrelay, name, firstresult):
+ self.hookrelay = hookrelay
+ self.name = name
+ self.firstresult = firstresult
+ self.trace = self.hookrelay.trace
+
+ def __repr__(self):
+ return "<HookCaller %r>" %(self.name,)
+
+ def __call__(self, **kwargs):
+ methods = self.hookrelay._pm.listattr(self.name)
+ return self._docall(methods, kwargs)
+
+ def pcall(self, plugins, **kwargs):
+ methods = self.hookrelay._pm.listattr(self.name, plugins=plugins)
+ return self._docall(methods, kwargs)
+
+ def _docall(self, methods, kwargs):
+ self.trace(self.name, kwargs)
+ self.trace.root.indent += 1
+ mc = MultiCall(methods, kwargs, firstresult=self.firstresult)
+ try:
+ res = mc.execute()
+ if res:
+ self.trace("finish", self.name, "-->", res)
+ finally:
+ self.trace.root.indent -= 1
+ return res
+
+_preinit = []
+
+def _preloadplugins():
+ _preinit.append(PluginManager(load=True))
+
+def main(args=None, plugins=None):
+ """ returned exit code integer, after an in-process testing run
+ with the given command line arguments, preloading an optional list
+ of passed in plugin objects. """
+ if args is None:
+ args = sys.argv[1:]
+ elif isinstance(args, py.path.local):
+ args = [str(args)]
+ elif not isinstance(args, (tuple, list)):
+ if not isinstance(args, str):
+ raise ValueError("not a string or argument list: %r" % (args,))
+ args = py.std.shlex.split(args)
+ if _preinit:
+ _pluginmanager = _preinit.pop(0)
+ else: # subsequent calls to main will create a fresh instance
+ _pluginmanager = PluginManager(load=True)
+ hook = _pluginmanager.hook
+ try:
+ if plugins:
+ for plugin in plugins:
+ _pluginmanager.register(plugin)
+ config = hook.pytest_cmdline_parse(
+ pluginmanager=_pluginmanager, args=args)
+ exitstatus = hook.pytest_cmdline_main(config=config)
+ except UsageError:
+ e = sys.exc_info()[1]
+ sys.stderr.write("ERROR: %s\n" %(e.args[0],))
+ exitstatus = 3
+ return exitstatus
+
+class UsageError(Exception):
+ """ error in py.test usage or invocation"""
+
diff --git a/_pytest/doctest.py b/_pytest/doctest.py
new file mode 100644
index 0000000000..1378544ba5
--- /dev/null
+++ b/_pytest/doctest.py
@@ -0,0 +1,87 @@
+""" discover and run doctests in modules and test files."""
+
+import pytest, py
+from py._code.code import TerminalRepr, ReprFileLocation
+
+def pytest_addoption(parser):
+ group = parser.getgroup("collect")
+ group.addoption("--doctest-modules",
+ action="store_true", default=False,
+ help="run doctests in all .py modules",
+ dest="doctestmodules")
+ group.addoption("--doctest-glob",
+ action="store", default="test*.txt", metavar="pat",
+ help="doctests file matching pattern, default: test*.txt",
+ dest="doctestglob")
+
+def pytest_collect_file(path, parent):
+ config = parent.config
+ if path.ext == ".py":
+ if config.option.doctestmodules:
+ return DoctestModule(path, parent)
+ elif (path.ext in ('.txt', '.rst') and parent.session.isinitpath(path)) or \
+ path.check(fnmatch=config.getvalue("doctestglob")):
+ return DoctestTextfile(path, parent)
+
+class ReprFailDoctest(TerminalRepr):
+ def __init__(self, reprlocation, lines):
+ self.reprlocation = reprlocation
+ self.lines = lines
+ def toterminal(self, tw):
+ for line in self.lines:
+ tw.line(line)
+ self.reprlocation.toterminal(tw)
+
+class DoctestItem(pytest.Item):
+ def repr_failure(self, excinfo):
+ doctest = py.std.doctest
+ if excinfo.errisinstance((doctest.DocTestFailure,
+ doctest.UnexpectedException)):
+ doctestfailure = excinfo.value
+ example = doctestfailure.example
+ test = doctestfailure.test
+ filename = test.filename
+ lineno = test.lineno + example.lineno + 1
+ message = excinfo.type.__name__
+ reprlocation = ReprFileLocation(filename, lineno, message)
+ checker = py.std.doctest.OutputChecker()
+ REPORT_UDIFF = py.std.doctest.REPORT_UDIFF
+ filelines = py.path.local(filename).readlines(cr=0)
+ i = max(test.lineno, max(0, lineno - 10)) # XXX?
+ lines = []
+ for line in filelines[i:lineno]:
+ lines.append("%03d %s" % (i+1, line))
+ i += 1
+ if excinfo.errisinstance(doctest.DocTestFailure):
+ lines += checker.output_difference(example,
+ doctestfailure.got, REPORT_UDIFF).split("\n")
+ else:
+ inner_excinfo = py.code.ExceptionInfo(excinfo.value.exc_info)
+ lines += ["UNEXPECTED EXCEPTION: %s" %
+ repr(inner_excinfo.value)]
+
+ return ReprFailDoctest(reprlocation, lines)
+ else:
+ return super(DoctestItem, self).repr_failure(excinfo)
+
+ def reportinfo(self):
+ return self.fspath, None, "[doctest]"
+
+class DoctestTextfile(DoctestItem, pytest.File):
+ def runtest(self):
+ doctest = py.std.doctest
+ failed, tot = doctest.testfile(
+ str(self.fspath), module_relative=False,
+ optionflags=doctest.ELLIPSIS,
+ raise_on_error=True, verbose=0)
+
+class DoctestModule(DoctestItem, pytest.File):
+ def runtest(self):
+ doctest = py.std.doctest
+ if self.fspath.basename == "conftest.py":
+ module = self.config._conftest.importconftest(self.fspath)
+ else:
+ module = self.fspath.pyimport()
+ failed, tot = doctest.testmod(
+ module, raise_on_error=True, verbose=0,
+ optionflags=doctest.ELLIPSIS)
diff --git a/_pytest/genscript.py b/_pytest/genscript.py
new file mode 100755
index 0000000000..8cb9c2f3a3
--- /dev/null
+++ b/_pytest/genscript.py
@@ -0,0 +1,73 @@
+""" generate a single-file self-contained version of py.test """
+import py
+import pickle
+import zlib
+import base64
+
+def find_toplevel(name):
+ for syspath in py.std.sys.path:
+ base = py.path.local(syspath)
+ lib = base/name
+ if lib.check(dir=1):
+ return lib
+ mod = base.join("%s.py" % name)
+ if mod.check(file=1):
+ return mod
+ raise LookupError(name)
+
+def pkgname(toplevel, rootpath, path):
+ parts = path.parts()[len(rootpath.parts()):]
+ return '.'.join([toplevel] + [x.purebasename for x in parts])
+
+def pkg_to_mapping(name):
+ toplevel = find_toplevel(name)
+ name2src = {}
+ if toplevel.check(file=1): # module
+ name2src[toplevel.purebasename] = toplevel.read()
+ else: # package
+ for pyfile in toplevel.visit('*.py'):
+ pkg = pkgname(name, toplevel, pyfile)
+ name2src[pkg] = pyfile.read()
+ return name2src
+
+def compress_mapping(mapping):
+ data = pickle.dumps(mapping, 2)
+ data = zlib.compress(data, 9)
+ data = base64.encodestring(data)
+ data = data.decode('ascii')
+ return data
+
+
+def compress_packages(names):
+ mapping = {}
+ for name in names:
+ mapping.update(pkg_to_mapping(name))
+ return compress_mapping(mapping)
+
+
+def generate_script(entry, packages):
+ data = compress_packages(packages)
+ tmpl = py.path.local(__file__).dirpath().join('standalonetemplate.py')
+ exe = tmpl.read()
+ exe = exe.replace('@SOURCES@', data)
+ exe = exe.replace('@ENTRY@', entry)
+ return exe
+
+
+def pytest_addoption(parser):
+ group = parser.getgroup("debugconfig")
+ group.addoption("--genscript", action="store", default=None,
+ dest="genscript", metavar="path",
+ help="create standalone py.test script at given target path.")
+
+def pytest_cmdline_main(config):
+ genscript = config.getvalue("genscript")
+ if genscript:
+ script = generate_script(
+ 'import py; raise SystemExit(py.test.cmdline.main())',
+ ['py', '_pytest', 'pytest'],
+ )
+
+ genscript = py.path.local(genscript)
+ genscript.write(script)
+ return 0
diff --git a/_pytest/helpconfig.py b/_pytest/helpconfig.py
new file mode 100644
index 0000000000..0e37affcf5
--- /dev/null
+++ b/_pytest/helpconfig.py
@@ -0,0 +1,182 @@
+""" version info, help messages, tracing configuration. """
+import py
+import pytest
+import inspect, sys
+
+def pytest_addoption(parser):
+ group = parser.getgroup('debugconfig')
+ group.addoption('--version', action="store_true",
+ help="display pytest lib version and import information.")
+ group._addoption("-h", "--help", action="store_true", dest="help",
+ help="show help message and configuration info")
+ group._addoption('-p', action="append", dest="plugins", default = [],
+ metavar="name",
+ help="early-load given plugin (multi-allowed).")
+ group.addoption('--traceconfig',
+ action="store_true", dest="traceconfig", default=False,
+ help="trace considerations of conftest.py files."),
+ group._addoption('--nomagic',
+ action="store_true", dest="nomagic", default=False,
+ help="don't reinterpret asserts, no traceback cutting. ")
+ group.addoption('--debug',
+ action="store_true", dest="debug", default=False,
+ help="generate and show internal debugging information.")
+
+
+def pytest_cmdline_main(config):
+ if config.option.version:
+ p = py.path.local(pytest.__file__)
+ sys.stderr.write("This is py.test version %s, imported from %s\n" %
+ (pytest.__version__, p))
+ plugininfo = getpluginversioninfo(config)
+ if plugininfo:
+ for line in plugininfo:
+ sys.stderr.write(line + "\n")
+ return 0
+ elif config.option.help:
+ config.pluginmanager.do_configure(config)
+ showhelp(config)
+ return 0
+
+def showhelp(config):
+ tw = py.io.TerminalWriter()
+ tw.write(config._parser.optparser.format_help())
+ tw.line()
+ tw.line()
+ #tw.sep( "=", "config file settings")
+ tw.line("[pytest] ini-options in the next "
+ "pytest.ini|tox.ini|setup.cfg file:")
+ tw.line()
+
+ for name in config._parser._ininames:
+ help, type, default = config._parser._inidict[name]
+ if type is None:
+ type = "string"
+ spec = "%s (%s)" % (name, type)
+ line = " %-24s %s" %(spec, help)
+ tw.line(line[:tw.fullwidth])
+
+ tw.line() ; tw.line()
+ #tw.sep("=")
+ return
+
+ tw.line("conftest.py options:")
+ tw.line()
+ conftestitems = sorted(config._parser._conftestdict.items())
+ for name, help in conftest_options + conftestitems:
+ line = " %-15s %s" %(name, help)
+ tw.line(line[:tw.fullwidth])
+ tw.line()
+ #tw.sep( "=")
+
+conftest_options = [
+ ('pytest_plugins', 'list of plugin names to load'),
+]
+
+def getpluginversioninfo(config):
+ lines = []
+ plugininfo = config.pluginmanager._plugin_distinfo
+ if plugininfo:
+ lines.append("setuptools registered plugins:")
+ for dist, plugin in plugininfo:
+ loc = getattr(plugin, '__file__', repr(plugin))
+ content = "%s-%s at %s" % (dist.project_name, dist.version, loc)
+ lines.append(" " + content)
+ return lines
+
+def pytest_report_header(config):
+ lines = []
+ if config.option.debug or config.option.traceconfig:
+ lines.append("using: pytest-%s pylib-%s" %
+ (pytest.__version__,py.__version__))
+
+ verinfo = getpluginversioninfo(config)
+ if verinfo:
+ lines.extend(verinfo)
+
+ if config.option.traceconfig:
+ lines.append("active plugins:")
+ plugins = []
+ items = config.pluginmanager._name2plugin.items()
+ for name, plugin in items:
+ if hasattr(plugin, '__file__'):
+ r = plugin.__file__
+ else:
+ r = repr(plugin)
+ lines.append(" %-20s: %s" %(name, r))
+ return lines
+
+
+# =====================================================
+# validate plugin syntax and hooks
+# =====================================================
+
+def pytest_plugin_registered(manager, plugin):
+ methods = collectattr(plugin)
+ hooks = {}
+ for hookspec in manager.hook._hookspecs:
+ hooks.update(collectattr(hookspec))
+
+ stringio = py.io.TextIO()
+ def Print(*args):
+ if args:
+ stringio.write(" ".join(map(str, args)))
+ stringio.write("\n")
+
+ fail = False
+ while methods:
+ name, method = methods.popitem()
+ #print "checking", name
+ if isgenerichook(name):
+ continue
+ if name not in hooks:
+ if not getattr(method, 'optionalhook', False):
+ Print("found unknown hook:", name)
+ fail = True
+ else:
+ #print "checking", method
+ method_args = getargs(method)
+ #print "method_args", method_args
+ if '__multicall__' in method_args:
+ method_args.remove('__multicall__')
+ hook = hooks[name]
+ hookargs = getargs(hook)
+ for arg in method_args:
+ if arg not in hookargs:
+ Print("argument %r not available" %(arg, ))
+ Print("actual definition: %s" %(formatdef(method)))
+ Print("available hook arguments: %s" %
+ ", ".join(hookargs))
+ fail = True
+ break
+ #if not fail:
+ # print "matching hook:", formatdef(method)
+ if fail:
+ name = getattr(plugin, '__name__', plugin)
+ raise PluginValidationError("%s:\n%s" % (name, stringio.getvalue()))
+
+class PluginValidationError(Exception):
+ """ plugin failed validation. """
+
+def isgenerichook(name):
+ return name == "pytest_plugins" or \
+ name.startswith("pytest_funcarg__")
+
+def getargs(func):
+ args = inspect.getargs(py.code.getrawcode(func))[0]
+ startindex = inspect.ismethod(func) and 1 or 0
+ return args[startindex:]
+
+def collectattr(obj):
+ methods = {}
+ for apiname in dir(obj):
+ if apiname.startswith("pytest_"):
+ methods[apiname] = getattr(obj, apiname)
+ return methods
+
+def formatdef(func):
+ return "%s%s" % (
+ func.__name__,
+ inspect.formatargspec(*inspect.getargspec(func))
+ )
+
diff --git a/_pytest/hookspec.py b/_pytest/hookspec.py
new file mode 100644
index 0000000000..580ab27997
--- /dev/null
+++ b/_pytest/hookspec.py
@@ -0,0 +1,222 @@
+""" hook specifications for pytest plugins, invoked from main.py and builtin plugins. """
+
+# -------------------------------------------------------------------------
+# Initialization
+# -------------------------------------------------------------------------
+
+def pytest_addhooks(pluginmanager):
+ """called at plugin load time to allow adding new hooks via a call to
+ pluginmanager.registerhooks(module)."""
+
+
+def pytest_namespace():
+ """return dict of name->object to be made globally available in
+ the py.test/pytest namespace. This hook is called before command
+ line options are parsed.
+ """
+
+def pytest_cmdline_parse(pluginmanager, args):
+ """return initialized config object, parsing the specified args. """
+pytest_cmdline_parse.firstresult = True
+
+def pytest_cmdline_preparse(config, args):
+ """modify command line arguments before option parsing. """
+
+def pytest_addoption(parser):
+ """add optparse-style options and ini-style config values via calls
+ to ``parser.addoption`` and ``parser.addini(...)``.
+ """
+
+def pytest_cmdline_main(config):
+ """ called for performing the main command line action. The default
+ implementation will invoke the configure hooks and runtest_mainloop. """
+pytest_cmdline_main.firstresult = True
+
+def pytest_configure(config):
+ """ called after command line options have been parsed.
+ and all plugins and initial conftest files been loaded.
+ """
+
+def pytest_unconfigure(config):
+ """ called before test process is exited. """
+
+def pytest_runtestloop(session):
+ """ called for performing the main runtest loop
+ (after collection finished). """
+pytest_runtestloop.firstresult = True
+
+# -------------------------------------------------------------------------
+# collection hooks
+# -------------------------------------------------------------------------
+
+def pytest_collection(session):
+ """ perform the collection protocol for the given session. """
+pytest_collection.firstresult = True
+
+def pytest_collection_modifyitems(session, config, items):
+ """ called after collection has been performed, may filter or re-order
+ the items in-place."""
+
+def pytest_collection_finish(session):
+ """ called after collection has been performed and modified. """
+
+def pytest_ignore_collect(path, config):
+ """ return True to prevent considering this path for collection.
+ This hook is consulted for all files and directories prior to calling
+ more specific hooks.
+ """
+pytest_ignore_collect.firstresult = True
+
+def pytest_collect_directory(path, parent):
+ """ called before traversing a directory for collection files. """
+pytest_collect_directory.firstresult = True
+
+def pytest_collect_file(path, parent):
+ """ return collection Node or None for the given path. Any new node
+ needs to have the specified ``parent`` as a parent."""
+
+# logging hooks for collection
+def pytest_collectstart(collector):
+ """ collector starts collecting. """
+
+def pytest_itemcollected(item):
+ """ we just collected a test item. """
+
+def pytest_collectreport(report):
+ """ collector finished collecting. """
+
+def pytest_deselected(items):
+ """ called for test items deselected by keyword. """
+
+def pytest_make_collect_report(collector):
+ """ perform ``collector.collect()`` and return a CollectReport. """
+pytest_make_collect_report.firstresult = True
+
+# -------------------------------------------------------------------------
+# Python test function related hooks
+# -------------------------------------------------------------------------
+
+def pytest_pycollect_makemodule(path, parent):
+ """ return a Module collector or None for the given path.
+ This hook will be called for each matching test module path.
+ The pytest_collect_file hook needs to be used if you want to
+ create test modules for files that do not match as a test module.
+ """
+pytest_pycollect_makemodule.firstresult = True
+
+def pytest_pycollect_makeitem(collector, name, obj):
+ """ return custom item/collector for a python object in a module, or None. """
+pytest_pycollect_makeitem.firstresult = True
+
+def pytest_pyfunc_call(pyfuncitem):
+ """ call underlying test function. """
+pytest_pyfunc_call.firstresult = True
+
+def pytest_generate_tests(metafunc):
+ """ generate (multiple) parametrized calls to a test function."""
+
+# -------------------------------------------------------------------------
+# generic runtest related hooks
+# -------------------------------------------------------------------------
+def pytest_itemstart(item, node=None):
+ """ (deprecated, use pytest_runtest_logstart). """
+
+def pytest_runtest_protocol(item):
+ """ implements the standard runtest_setup/call/teardown protocol including
+ capturing exceptions and calling reporting hooks on the results accordingly.
+
+ :return boolean: True if no further hook implementations should be invoked.
+ """
+pytest_runtest_protocol.firstresult = True
+
+def pytest_runtest_logstart(nodeid, location):
+ """ signal the start of a test run. """
+
+def pytest_runtest_setup(item):
+ """ called before ``pytest_runtest_call(item)``. """
+
+def pytest_runtest_call(item):
+ """ called to execute the test ``item``. """
+
+def pytest_runtest_teardown(item):
+ """ called after ``pytest_runtest_call``. """
+
+def pytest_runtest_makereport(item, call):
+ """ return a :py:class:`_pytest.runner.TestReport` object
+ for the given :py:class:`pytest.Item` and
+ :py:class:`_pytest.runner.CallInfo`.
+ """
+pytest_runtest_makereport.firstresult = True
+
+def pytest_runtest_logreport(report):
+ """ process item test report. """
+
+# special handling for final teardown - somewhat internal for now
+def pytest__teardown_final(session):
+ """ called before test session finishes. """
+pytest__teardown_final.firstresult = True
+
+def pytest__teardown_final_logerror(report, session):
+ """ called if runtest_teardown_final failed. """
+
+# -------------------------------------------------------------------------
+# test session related hooks
+# -------------------------------------------------------------------------
+
+def pytest_sessionstart(session):
+ """ before session.main() is called. """
+
+def pytest_sessionfinish(session, exitstatus):
+ """ whole test run finishes. """
+
+
+# -------------------------------------------------------------------------
+# hooks for customising the assert methods
+# -------------------------------------------------------------------------
+
+def pytest_assertrepr_compare(config, op, left, right):
+ """return explanation for comparisons in failing assert expressions.
+
+ Return None for no custom explanation, otherwise return a list
+ of strings. The strings will be joined by newlines but any newlines
+ *in* a string will be escaped. Note that all but the first line will
+ be indented sligthly, the intention is for the first line to be a summary.
+ """
+
+# -------------------------------------------------------------------------
+# hooks for influencing reporting (invoked from _pytest_terminal)
+# -------------------------------------------------------------------------
+
+def pytest_report_header(config):
+ """ return a string to be displayed as header info for terminal reporting."""
+
+def pytest_report_teststatus(report):
+ """ return result-category, shortletter and verbose word for reporting."""
+pytest_report_teststatus.firstresult = True
+
+def pytest_terminal_summary(terminalreporter):
+ """ add additional section in terminal summary reporting. """
+
+# -------------------------------------------------------------------------
+# doctest hooks
+# -------------------------------------------------------------------------
+
+def pytest_doctest_prepare_content(content):
+ """ return processed content for a given doctest"""
+pytest_doctest_prepare_content.firstresult = True
+
+# -------------------------------------------------------------------------
+# error handling and internal debugging hooks
+# -------------------------------------------------------------------------
+
+def pytest_plugin_registered(plugin, manager):
+ """ a new py lib plugin got registered. """
+
+def pytest_plugin_unregistered(plugin):
+ """ a py lib plugin got unregistered. """
+
+def pytest_internalerror(excrepr):
+ """ called for internal errors. """
+
+def pytest_keyboard_interrupt(excinfo):
+ """ called for keyboard interrupt. """
diff --git a/_pytest/junitxml.py b/_pytest/junitxml.py
new file mode 100644
index 0000000000..db6c5a8b26
--- /dev/null
+++ b/_pytest/junitxml.py
@@ -0,0 +1,173 @@
+""" report test results in JUnit-XML format, for use with Hudson and build integration servers.
+
+Based on initial code from Ross Lawley.
+"""
+
+import py
+import os
+import time
+
+def pytest_addoption(parser):
+ group = parser.getgroup("terminal reporting")
+ group.addoption('--junitxml', action="store", dest="xmlpath",
+ metavar="path", default=None,
+ help="create junit-xml style report file at given path.")
+ group.addoption('--junitprefix', action="store", dest="junitprefix",
+ metavar="str", default=None,
+ help="prepend prefix to classnames in junit-xml output")
+
+def pytest_configure(config):
+ xmlpath = config.option.xmlpath
+ if xmlpath:
+ config._xml = LogXML(xmlpath, config.option.junitprefix)
+ config.pluginmanager.register(config._xml)
+
+def pytest_unconfigure(config):
+ xml = getattr(config, '_xml', None)
+ if xml:
+ del config._xml
+ config.pluginmanager.unregister(xml)
+
+class LogXML(object):
+ def __init__(self, logfile, prefix):
+ self.logfile = logfile
+ self.prefix = prefix
+ self.test_logs = []
+ self.passed = self.skipped = 0
+ self.failed = self.errors = 0
+ self._durations = {}
+
+ def _opentestcase(self, report):
+ names = report.nodeid.split("::")
+ names[0] = names[0].replace("/", '.')
+ names = tuple(names)
+ d = {'time': self._durations.pop(names, "0")}
+ names = [x.replace(".py", "") for x in names if x != "()"]
+ classnames = names[:-1]
+ if self.prefix:
+ classnames.insert(0, self.prefix)
+ d['classname'] = ".".join(classnames)
+ d['name'] = py.xml.escape(names[-1])
+ attrs = ['%s="%s"' % item for item in sorted(d.items())]
+ self.test_logs.append("\n<testcase %s>" % " ".join(attrs))
+
+ def _closetestcase(self):
+ self.test_logs.append("</testcase>")
+
+ def appendlog(self, fmt, *args):
+ args = tuple([py.xml.escape(arg) for arg in args])
+ self.test_logs.append(fmt % args)
+
+ def append_pass(self, report):
+ self.passed += 1
+ self._opentestcase(report)
+ self._closetestcase()
+
+ def append_failure(self, report):
+ self._opentestcase(report)
+ #msg = str(report.longrepr.reprtraceback.extraline)
+ if "xfail" in report.keywords:
+ self.appendlog(
+ '<skipped message="xfail-marked test passes unexpectedly"/>')
+ self.skipped += 1
+ else:
+ self.appendlog('<failure message="test failure">%s</failure>',
+ report.longrepr)
+ self.failed += 1
+ self._closetestcase()
+
+ def append_collect_failure(self, report):
+ self._opentestcase(report)
+ #msg = str(report.longrepr.reprtraceback.extraline)
+ self.appendlog('<failure message="collection failure">%s</failure>',
+ report.longrepr)
+ self._closetestcase()
+ self.errors += 1
+
+ def append_collect_skipped(self, report):
+ self._opentestcase(report)
+ #msg = str(report.longrepr.reprtraceback.extraline)
+ self.appendlog('<skipped message="collection skipped">%s</skipped>',
+ report.longrepr)
+ self._closetestcase()
+ self.skipped += 1
+
+ def append_error(self, report):
+ self._opentestcase(report)
+ self.appendlog('<error message="test setup failure">%s</error>',
+ report.longrepr)
+ self._closetestcase()
+ self.errors += 1
+
+ def append_skipped(self, report):
+ self._opentestcase(report)
+ if "xfail" in report.keywords:
+ self.appendlog(
+ '<skipped message="expected test failure">%s</skipped>',
+ report.keywords['xfail'])
+ else:
+ self.appendlog("<skipped/>")
+ self._closetestcase()
+ self.skipped += 1
+
+ def pytest_runtest_logreport(self, report):
+ if report.passed:
+ self.append_pass(report)
+ elif report.failed:
+ if report.when != "call":
+ self.append_error(report)
+ else:
+ self.append_failure(report)
+ elif report.skipped:
+ self.append_skipped(report)
+
+ def pytest_runtest_call(self, item, __multicall__):
+ names = tuple(item.listnames())
+ start = time.time()
+ try:
+ return __multicall__.execute()
+ finally:
+ self._durations[names] = time.time() - start
+
+ def pytest_collectreport(self, report):
+ if not report.passed:
+ if report.failed:
+ self.append_collect_failure(report)
+ else:
+ self.append_collect_skipped(report)
+
+ def pytest_internalerror(self, excrepr):
+ self.errors += 1
+ data = py.xml.escape(excrepr)
+ self.test_logs.append(
+ '\n<testcase classname="pytest" name="internal">'
+ ' <error message="internal error">'
+ '%s</error></testcase>' % data)
+
+ def pytest_sessionstart(self, session):
+ self.suite_start_time = time.time()
+
+ def pytest_sessionfinish(self, session, exitstatus, __multicall__):
+ if py.std.sys.version_info[0] < 3:
+ logfile = py.std.codecs.open(self.logfile, 'w', encoding='utf-8')
+ else:
+ logfile = open(self.logfile, 'w', encoding='utf-8')
+
+ suite_stop_time = time.time()
+ suite_time_delta = suite_stop_time - self.suite_start_time
+ numtests = self.passed + self.failed
+ logfile.write('<?xml version="1.0" encoding="utf-8"?>')
+ logfile.write('<testsuite ')
+ logfile.write('name="" ')
+ logfile.write('errors="%i" ' % self.errors)
+ logfile.write('failures="%i" ' % self.failed)
+ logfile.write('skips="%i" ' % self.skipped)
+ logfile.write('tests="%i" ' % numtests)
+ logfile.write('time="%.3f"' % suite_time_delta)
+ logfile.write(' >')
+ logfile.writelines(self.test_logs)
+ logfile.write('</testsuite>')
+ logfile.close()
+
+ def pytest_terminal_summary(self, terminalreporter):
+ terminalreporter.write_sep("-", "generated xml file: %s" % (self.logfile))
diff --git a/_pytest/main.py b/_pytest/main.py
new file mode 100644
index 0000000000..f1be30601c
--- /dev/null
+++ b/_pytest/main.py
@@ -0,0 +1,517 @@
+""" core implementation of testing process: init, session, runtest loop. """
+
+import py
+import pytest, _pytest
+import os, sys
+tracebackcutdir = py.path.local(_pytest.__file__).dirpath()
+
+# exitcodes for the command line
+EXIT_OK = 0
+EXIT_TESTSFAILED = 1
+EXIT_INTERRUPTED = 2
+EXIT_INTERNALERROR = 3
+
+def pytest_addoption(parser):
+ parser.addini("norecursedirs", "directory patterns to avoid for recursion",
+ type="args", default=('.*', 'CVS', '_darcs', '{arch}'))
+ #parser.addini("dirpatterns",
+ # "patterns specifying possible locations of test files",
+ # type="linelist", default=["**/test_*.txt",
+ # "**/test_*.py", "**/*_test.py"]
+ #)
+ group = parser.getgroup("general", "running and selection options")
+ group._addoption('-x', '--exitfirst', action="store_true", default=False,
+ dest="exitfirst",
+ help="exit instantly on first error or failed test."),
+ group._addoption('--maxfail', metavar="num",
+ action="store", type="int", dest="maxfail", default=0,
+ help="exit after first num failures or errors.")
+
+ group = parser.getgroup("collect", "collection")
+ group.addoption('--collectonly',
+ action="store_true", dest="collectonly",
+ help="only collect tests, don't execute them."),
+ group.addoption('--pyargs', action="store_true",
+ help="try to interpret all arguments as python packages.")
+ group.addoption("--ignore", action="append", metavar="path",
+ help="ignore path during collection (multi-allowed).")
+ group.addoption('--confcutdir', dest="confcutdir", default=None,
+ metavar="dir",
+ help="only load conftest.py's relative to specified dir.")
+
+ group = parser.getgroup("debugconfig",
+ "test session debugging and configuration")
+ group.addoption('--basetemp', dest="basetemp", default=None, metavar="dir",
+ help="base temporary directory for this test run.")
+
+
+def pytest_namespace():
+ return dict(collect=dict(Item=Item, Collector=Collector, File=File))
+
+def pytest_configure(config):
+ py.test.config = config # compatibiltiy
+ if config.option.exitfirst:
+ config.option.maxfail = 1
+
+def pytest_cmdline_main(config):
+ """ default command line protocol for initialization, session,
+ running tests and reporting. """
+ session = Session(config)
+ session.exitstatus = EXIT_OK
+ try:
+ config.pluginmanager.do_configure(config)
+ config.hook.pytest_sessionstart(session=session)
+ config.hook.pytest_collection(session=session)
+ config.hook.pytest_runtestloop(session=session)
+ except pytest.UsageError:
+ raise
+ except KeyboardInterrupt:
+ excinfo = py.code.ExceptionInfo()
+ config.hook.pytest_keyboard_interrupt(excinfo=excinfo)
+ session.exitstatus = EXIT_INTERRUPTED
+ except:
+ excinfo = py.code.ExceptionInfo()
+ config.pluginmanager.notify_exception(excinfo)
+ session.exitstatus = EXIT_INTERNALERROR
+ if excinfo.errisinstance(SystemExit):
+ sys.stderr.write("mainloop: caught Spurious SystemExit!\n")
+ if not session.exitstatus and session._testsfailed:
+ session.exitstatus = EXIT_TESTSFAILED
+ config.hook.pytest_sessionfinish(session=session,
+ exitstatus=session.exitstatus)
+ config.pluginmanager.do_unconfigure(config)
+ return session.exitstatus
+
+def pytest_collection(session):
+ session.perform_collect()
+ hook = session.config.hook
+ hook.pytest_collection_modifyitems(session=session,
+ config=session.config, items=session.items)
+ hook.pytest_collection_finish(session=session)
+ return True
+
+def pytest_runtestloop(session):
+ if session.config.option.collectonly:
+ return True
+ for item in session.session.items:
+ item.config.hook.pytest_runtest_protocol(item=item)
+ if session.shouldstop:
+ raise session.Interrupted(session.shouldstop)
+ return True
+
+def pytest_ignore_collect(path, config):
+ p = path.dirpath()
+ ignore_paths = config._getconftest_pathlist("collect_ignore", path=p)
+ ignore_paths = ignore_paths or []
+ excludeopt = config.getvalue("ignore")
+ if excludeopt:
+ ignore_paths.extend([py.path.local(x) for x in excludeopt])
+ return path in ignore_paths
+
+class HookProxy:
+ def __init__(self, fspath, config):
+ self.fspath = fspath
+ self.config = config
+ def __getattr__(self, name):
+ hookmethod = getattr(self.config.hook, name)
+ def call_matching_hooks(**kwargs):
+ plugins = self.config._getmatchingplugins(self.fspath)
+ return hookmethod.pcall(plugins, **kwargs)
+ return call_matching_hooks
+
+def compatproperty(name):
+ def fget(self):
+ #print "retrieving %r property from %s" %(name, self.fspath)
+ py.log._apiwarn("2.0", "use pytest.%s for "
+ "test collection and item classes" % name)
+ return getattr(pytest, name)
+ return property(fget, None, None,
+ "deprecated attribute %r, use pytest.%s" % (name,name))
+
+class Node(object):
+ """ base class for all Nodes in the collection tree.
+ Collector subclasses have children, Items are terminal nodes."""
+
+ def __init__(self, name, parent=None, config=None, session=None):
+ #: a unique name with the scope of the parent
+ self.name = name
+
+ #: the parent collector node.
+ self.parent = parent
+
+ #: the test config object
+ self.config = config or parent.config
+
+ #: the collection this node is part of
+ self.session = session or parent.session
+
+ #: filesystem path where this node was collected from
+ self.fspath = getattr(parent, 'fspath', None)
+ self.ihook = self.session.gethookproxy(self.fspath)
+ self.keywords = {self.name: True}
+
+ Module = compatproperty("Module")
+ Class = compatproperty("Class")
+ Instance = compatproperty("Instance")
+ Function = compatproperty("Function")
+ File = compatproperty("File")
+ Item = compatproperty("Item")
+
+ def __repr__(self):
+ return "<%s %r>" %(self.__class__.__name__, getattr(self, 'name', None))
+
+ # methods for ordering nodes
+ @property
+ def nodeid(self):
+ try:
+ return self._nodeid
+ except AttributeError:
+ self._nodeid = x = self._makeid()
+ return x
+
+ def _makeid(self):
+ return self.parent.nodeid + "::" + self.name
+
+ def __eq__(self, other):
+ if not isinstance(other, Node):
+ return False
+ return self.__class__ == other.__class__ and \
+ self.name == other.name and self.parent == other.parent
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __hash__(self):
+ return hash((self.name, self.parent))
+
+ def setup(self):
+ pass
+
+ def teardown(self):
+ pass
+
+ def _memoizedcall(self, attrname, function):
+ exattrname = "_ex_" + attrname
+ failure = getattr(self, exattrname, None)
+ if failure is not None:
+ py.builtin._reraise(failure[0], failure[1], failure[2])
+ if hasattr(self, attrname):
+ return getattr(self, attrname)
+ try:
+ res = function()
+ except py.builtin._sysex:
+ raise
+ except:
+ failure = py.std.sys.exc_info()
+ setattr(self, exattrname, failure)
+ raise
+ setattr(self, attrname, res)
+ return res
+
+ def listchain(self):
+ """ return list of all parent collectors up to self,
+ starting from root of collection tree. """
+ l = [self]
+ while 1:
+ x = l[0]
+ if x.parent is not None: # and x.parent.parent is not None:
+ l.insert(0, x.parent)
+ else:
+ return l
+
+ def listnames(self):
+ return [x.name for x in self.listchain()]
+
+ def getplugins(self):
+ return self.config._getmatchingplugins(self.fspath)
+
+ def getparent(self, cls):
+ current = self
+ while current and not isinstance(current, cls):
+ current = current.parent
+ return current
+
+ def _prunetraceback(self, excinfo):
+ pass
+
+ def _repr_failure_py(self, excinfo, style=None):
+ if self.config.option.fulltrace:
+ style="long"
+ else:
+ self._prunetraceback(excinfo)
+ # XXX should excinfo.getrepr record all data and toterminal()
+ # process it?
+ if style is None:
+ if self.config.option.tbstyle == "short":
+ style = "short"
+ else:
+ style = "long"
+ return excinfo.getrepr(funcargs=True,
+ showlocals=self.config.option.showlocals,
+ style=style)
+
+ repr_failure = _repr_failure_py
+
+class Collector(Node):
+ """ Collector instances create children through collect()
+ and thus iteratively build a tree.
+ """
+ class CollectError(Exception):
+ """ an error during collection, contains a custom message. """
+
+ def collect(self):
+ """ returns a list of children (items and collectors)
+ for this collection node.
+ """
+ raise NotImplementedError("abstract")
+
+ def repr_failure(self, excinfo):
+ """ represent a collection failure. """
+ if excinfo.errisinstance(self.CollectError):
+ exc = excinfo.value
+ return str(exc.args[0])
+ return self._repr_failure_py(excinfo, style="short")
+
+ def _memocollect(self):
+ """ internal helper method to cache results of calling collect(). """
+ return self._memoizedcall('_collected', lambda: list(self.collect()))
+
+ def _prunetraceback(self, excinfo):
+ if hasattr(self, 'fspath'):
+ path = self.fspath
+ traceback = excinfo.traceback
+ ntraceback = traceback.cut(path=self.fspath)
+ if ntraceback == traceback:
+ ntraceback = ntraceback.cut(excludepath=tracebackcutdir)
+ excinfo.traceback = ntraceback.filter()
+
+class FSCollector(Collector):
+ def __init__(self, fspath, parent=None, config=None, session=None):
+ fspath = py.path.local(fspath) # xxx only for test_resultlog.py?
+ name = fspath.basename
+ if parent is not None:
+ rel = fspath.relto(parent.fspath)
+ if rel:
+ name = rel
+ name = name.replace(os.sep, "/")
+ super(FSCollector, self).__init__(name, parent, config, session)
+ self.fspath = fspath
+
+ def _makeid(self):
+ if self == self.session:
+ return "."
+ relpath = self.session.fspath.bestrelpath(self.fspath)
+ if os.sep != "/":
+ relpath = relpath.replace(os.sep, "/")
+ return relpath
+
+class File(FSCollector):
+ """ base class for collecting tests from a file. """
+
+class Item(Node):
+ """ a basic test invocation item. Note that for a single function
+ there might be multiple test invocation items.
+ """
+ def reportinfo(self):
+ return self.fspath, None, ""
+
+ @property
+ def location(self):
+ try:
+ return self._location
+ except AttributeError:
+ location = self.reportinfo()
+ fspath = self.session.fspath.bestrelpath(location[0])
+ location = (fspath, location[1], str(location[2]))
+ self._location = location
+ return location
+
+class NoMatch(Exception):
+ """ raised if matching cannot locate a matching names. """
+
+class Session(FSCollector):
+ class Interrupted(KeyboardInterrupt):
+ """ signals an interrupted test run. """
+ __module__ = 'builtins' # for py3
+
+ def __init__(self, config):
+ super(Session, self).__init__(py.path.local(), parent=None,
+ config=config, session=self)
+ assert self.config.pluginmanager.register(self, name="session", prepend=True)
+ self._testsfailed = 0
+ self.shouldstop = False
+ self.trace = config.trace.root.get("collection")
+ self._norecursepatterns = config.getini("norecursedirs")
+
+ def pytest_collectstart(self):
+ if self.shouldstop:
+ raise self.Interrupted(self.shouldstop)
+
+ def pytest_runtest_logreport(self, report):
+ if report.failed and 'xfail' not in getattr(report, 'keywords', []):
+ self._testsfailed += 1
+ maxfail = self.config.getvalue("maxfail")
+ if maxfail and self._testsfailed >= maxfail:
+ self.shouldstop = "stopping after %d failures" % (
+ self._testsfailed)
+ pytest_collectreport = pytest_runtest_logreport
+
+ def isinitpath(self, path):
+ return path in self._initialpaths
+
+ def gethookproxy(self, fspath):
+ return HookProxy(fspath, self.config)
+
+ def perform_collect(self, args=None, genitems=True):
+ if args is None:
+ args = self.config.args
+ self.trace("perform_collect", self, args)
+ self.trace.root.indent += 1
+ self._notfound = []
+ self._initialpaths = set()
+ self._initialparts = []
+ for arg in args:
+ parts = self._parsearg(arg)
+ self._initialparts.append(parts)
+ self._initialpaths.add(parts[0])
+ self.ihook.pytest_collectstart(collector=self)
+ rep = self.ihook.pytest_make_collect_report(collector=self)
+ self.ihook.pytest_collectreport(report=rep)
+ self.trace.root.indent -= 1
+ if self._notfound:
+ for arg, exc in self._notfound:
+ line = "(no name %r in any of %r)" % (arg, exc.args[0])
+ raise pytest.UsageError("not found: %s\n%s" %(arg, line))
+ if not genitems:
+ return rep.result
+ else:
+ self.items = items = []
+ if rep.passed:
+ for node in rep.result:
+ self.items.extend(self.genitems(node))
+ return items
+
+ def collect(self):
+ for parts in self._initialparts:
+ arg = "::".join(map(str, parts))
+ self.trace("processing argument", arg)
+ self.trace.root.indent += 1
+ try:
+ for x in self._collect(arg):
+ yield x
+ except NoMatch:
+ # we are inside a make_report hook so
+ # we cannot directly pass through the exception
+ self._notfound.append((arg, sys.exc_info()[1]))
+ self.trace.root.indent -= 1
+ break
+ self.trace.root.indent -= 1
+
+ def _collect(self, arg):
+ names = self._parsearg(arg)
+ path = names.pop(0)
+ if path.check(dir=1):
+ assert not names, "invalid arg %r" %(arg,)
+ for path in path.visit(fil=lambda x: x.check(file=1),
+ rec=self._recurse, bf=True, sort=True):
+ for x in self._collectfile(path):
+ yield x
+ else:
+ assert path.check(file=1)
+ for x in self.matchnodes(self._collectfile(path), names):
+ yield x
+
+ def _collectfile(self, path):
+ ihook = self.gethookproxy(path)
+ if not self.isinitpath(path):
+ if ihook.pytest_ignore_collect(path=path, config=self.config):
+ return ()
+ return ihook.pytest_collect_file(path=path, parent=self)
+
+ def _recurse(self, path):
+ ihook = self.gethookproxy(path.dirpath())
+ if ihook.pytest_ignore_collect(path=path, config=self.config):
+ return
+ for pat in self._norecursepatterns:
+ if path.check(fnmatch=pat):
+ return False
+ ihook = self.gethookproxy(path)
+ ihook.pytest_collect_directory(path=path, parent=self)
+ return True
+
+ def _tryconvertpyarg(self, x):
+ try:
+ mod = __import__(x, None, None, ['__doc__'])
+ except (ValueError, ImportError):
+ return x
+ p = py.path.local(mod.__file__)
+ if p.purebasename == "__init__":
+ p = p.dirpath()
+ else:
+ p = p.new(basename=p.purebasename+".py")
+ return p
+
+ def _parsearg(self, arg):
+ """ return (fspath, names) tuple after checking the file exists. """
+ arg = str(arg)
+ if self.config.option.pyargs:
+ arg = self._tryconvertpyarg(arg)
+ parts = str(arg).split("::")
+ relpath = parts[0].replace("/", os.sep)
+ path = self.fspath.join(relpath, abs=True)
+ if not path.check():
+ if self.config.option.pyargs:
+ msg = "file or package not found: "
+ else:
+ msg = "file not found: "
+ raise pytest.UsageError(msg + arg)
+ parts[0] = path
+ return parts
+
+ def matchnodes(self, matching, names):
+ self.trace("matchnodes", matching, names)
+ self.trace.root.indent += 1
+ nodes = self._matchnodes(matching, names)
+ num = len(nodes)
+ self.trace("matchnodes finished -> ", num, "nodes")
+ self.trace.root.indent -= 1
+ if num == 0:
+ raise NoMatch(matching, names[:1])
+ return nodes
+
+ def _matchnodes(self, matching, names):
+ if not matching or not names:
+ return matching
+ name = names[0]
+ assert name
+ nextnames = names[1:]
+ resultnodes = []
+ for node in matching:
+ if isinstance(node, pytest.Item):
+ if not names:
+ resultnodes.append(node)
+ continue
+ assert isinstance(node, pytest.Collector)
+ node.ihook.pytest_collectstart(collector=node)
+ rep = node.ihook.pytest_make_collect_report(collector=node)
+ if rep.passed:
+ for x in rep.result:
+ if x.name == name:
+ resultnodes.extend(self.matchnodes([x], nextnames))
+ node.ihook.pytest_collectreport(report=rep)
+ return resultnodes
+
+ def genitems(self, node):
+ self.trace("genitems", node)
+ if isinstance(node, pytest.Item):
+ node.ihook.pytest_itemcollected(item=node)
+ yield node
+ else:
+ assert isinstance(node, pytest.Collector)
+ node.ihook.pytest_collectstart(collector=node)
+ rep = node.ihook.pytest_make_collect_report(collector=node)
+ if rep.passed:
+ for subnode in rep.result:
+ for x in self.genitems(subnode):
+ yield x
+ node.ihook.pytest_collectreport(report=rep)
diff --git a/_pytest/mark.py b/_pytest/mark.py
new file mode 100644
index 0000000000..5383e3a9d5
--- /dev/null
+++ b/_pytest/mark.py
@@ -0,0 +1,176 @@
+""" generic mechanism for marking and selecting python functions. """
+import pytest, py
+
+def pytest_namespace():
+ return {'mark': MarkGenerator()}
+
+def pytest_addoption(parser):
+ group = parser.getgroup("general")
+ group._addoption('-k',
+ action="store", dest="keyword", default='', metavar="KEYWORDEXPR",
+ help="only run tests which match given keyword expression. "
+ "An expression consists of space-separated terms. "
+ "Each term must match. Precede a term with '-' to negate. "
+ "Terminate expression with ':' to make the first match match "
+ "all subsequent tests (usually file-order). ")
+
+def pytest_collection_modifyitems(items, config):
+ keywordexpr = config.option.keyword
+ if not keywordexpr:
+ return
+ selectuntil = False
+ if keywordexpr[-1] == ":":
+ selectuntil = True
+ keywordexpr = keywordexpr[:-1]
+
+ remaining = []
+ deselected = []
+ for colitem in items:
+ if keywordexpr and skipbykeyword(colitem, keywordexpr):
+ deselected.append(colitem)
+ else:
+ remaining.append(colitem)
+ if selectuntil:
+ keywordexpr = None
+
+ if deselected:
+ config.hook.pytest_deselected(items=deselected)
+ items[:] = remaining
+
+def skipbykeyword(colitem, keywordexpr):
+ """ return True if they given keyword expression means to
+ skip this collector/item.
+ """
+ if not keywordexpr:
+ return
+
+ itemkeywords = getkeywords(colitem)
+ for key in filter(None, keywordexpr.split()):
+ eor = key[:1] == '-'
+ if eor:
+ key = key[1:]
+ if not (eor ^ matchonekeyword(key, itemkeywords)):
+ return True
+
+def getkeywords(node):
+ keywords = {}
+ while node is not None:
+ keywords.update(node.keywords)
+ node = node.parent
+ return keywords
+
+
+def matchonekeyword(key, itemkeywords):
+ for elem in key.split("."):
+ for kw in itemkeywords:
+ if elem in kw:
+ break
+ else:
+ return False
+ return True
+
+class MarkGenerator:
+ """ Factory for :class:`MarkDecorator` objects - exposed as
+ a ``py.test.mark`` singleton instance. Example::
+
+ import py
+ @py.test.mark.slowtest
+ def test_function():
+ pass
+
+ will set a 'slowtest' :class:`MarkInfo` object
+ on the ``test_function`` object. """
+
+ def __getattr__(self, name):
+ if name[0] == "_":
+ raise AttributeError(name)
+ return MarkDecorator(name)
+
+class MarkDecorator:
+ """ A decorator for test functions and test classes. When applied
+ it will create :class:`MarkInfo` objects which may be
+ :ref:`retrieved by hooks as item keywords` MarkDecorator instances
+ are usually created by writing::
+
+ mark1 = py.test.mark.NAME # simple MarkDecorator
+ mark2 = py.test.mark.NAME(name1=value) # parametrized MarkDecorator
+
+ and can then be applied as decorators to test functions::
+
+ @mark2
+ def test_function():
+ pass
+ """
+ def __init__(self, name, args=None, kwargs=None):
+ self.markname = name
+ self.args = args or ()
+ self.kwargs = kwargs or {}
+
+ def __repr__(self):
+ d = self.__dict__.copy()
+ name = d.pop('markname')
+ return "<MarkDecorator %r %r>" %(name, d)
+
+ def __call__(self, *args, **kwargs):
+ """ if passed a single callable argument: decorate it with mark info.
+ otherwise add *args/**kwargs in-place to mark information. """
+ if args:
+ func = args[0]
+ if len(args) == 1 and hasattr(func, '__call__') or \
+ hasattr(func, '__bases__'):
+ if hasattr(func, '__bases__'):
+ if hasattr(func, 'pytestmark'):
+ l = func.pytestmark
+ if not isinstance(l, list):
+ func.pytestmark = [l, self]
+ else:
+ l.append(self)
+ else:
+ func.pytestmark = [self]
+ else:
+ holder = getattr(func, self.markname, None)
+ if holder is None:
+ holder = MarkInfo(self.markname, self.args, self.kwargs)
+ setattr(func, self.markname, holder)
+ else:
+ holder.kwargs.update(self.kwargs)
+ holder.args += self.args
+ return func
+ kw = self.kwargs.copy()
+ kw.update(kwargs)
+ args = self.args + args
+ return self.__class__(self.markname, args=args, kwargs=kw)
+
+class MarkInfo:
+ """ Marking object created by :class:`MarkDecorator` instances. """
+ def __init__(self, name, args, kwargs):
+ #: name of attribute
+ self.name = name
+ #: positional argument list, empty if none specified
+ self.args = args
+ #: keyword argument dictionary, empty if nothing specified
+ self.kwargs = kwargs
+
+ def __repr__(self):
+ return "<MarkInfo %r args=%r kwargs=%r>" % (
+ self._name, self.args, self.kwargs)
+
+def pytest_itemcollected(item):
+ if not isinstance(item, pytest.Function):
+ return
+ try:
+ func = item.obj.__func__
+ except AttributeError:
+ func = getattr(item.obj, 'im_func', item.obj)
+ pyclasses = (pytest.Class, pytest.Module)
+ for node in item.listchain():
+ if isinstance(node, pyclasses):
+ marker = getattr(node.obj, 'pytestmark', None)
+ if marker is not None:
+ if isinstance(marker, list):
+ for mark in marker:
+ mark(func)
+ else:
+ marker(func)
+ node = node.parent
+ item.keywords.update(py.builtin._getfuncdict(func))
diff --git a/_pytest/monkeypatch.py b/_pytest/monkeypatch.py
new file mode 100644
index 0000000000..1ab0b382f6
--- /dev/null
+++ b/_pytest/monkeypatch.py
@@ -0,0 +1,103 @@
+""" monkeypatching and mocking functionality. """
+
+import os, sys
+
+def pytest_funcarg__monkeypatch(request):
+ """The returned ``monkeypatch`` funcarg provides these
+ helper methods to modify objects, dictionaries or os.environ::
+
+ monkeypatch.setattr(obj, name, value, raising=True)
+ monkeypatch.delattr(obj, name, raising=True)
+ monkeypatch.setitem(mapping, name, value)
+ monkeypatch.delitem(obj, name, raising=True)
+ monkeypatch.setenv(name, value, prepend=False)
+ monkeypatch.delenv(name, value, raising=True)
+ monkeypatch.syspath_prepend(path)
+
+ All modifications will be undone when the requesting
+ test function finished its execution. The ``raising``
+ parameter determines if a KeyError or AttributeError
+ will be raised if the set/deletion operation has no target.
+ """
+ mpatch = monkeypatch()
+ request.addfinalizer(mpatch.undo)
+ return mpatch
+
+notset = object()
+
+class monkeypatch:
+ """ object keeping a record of setattr/item/env/syspath changes. """
+ def __init__(self):
+ self._setattr = []
+ self._setitem = []
+
+ def setattr(self, obj, name, value, raising=True):
+ """ set attribute ``name`` on ``obj`` to ``value``, by default
+ raise AttributeEror if the attribute did not exist. """
+ oldval = getattr(obj, name, notset)
+ if raising and oldval is notset:
+ raise AttributeError("%r has no attribute %r" %(obj, name))
+ self._setattr.insert(0, (obj, name, oldval))
+ setattr(obj, name, value)
+
+ def delattr(self, obj, name, raising=True):
+ """ delete attribute ``name`` from ``obj``, by default raise
+ AttributeError it the attribute did not previously exist. """
+ if not hasattr(obj, name):
+ if raising:
+ raise AttributeError(name)
+ else:
+ self._setattr.insert(0, (obj, name, getattr(obj, name, notset)))
+ delattr(obj, name)
+
+ def setitem(self, dic, name, value):
+ """ set dictionary entry ``name`` to value. """
+ self._setitem.insert(0, (dic, name, dic.get(name, notset)))
+ dic[name] = value
+
+ def delitem(self, dic, name, raising=True):
+ """ delete ``name`` from dict, raise KeyError if it doesn't exist."""
+ if name not in dic:
+ if raising:
+ raise KeyError(name)
+ else:
+ self._setitem.insert(0, (dic, name, dic.get(name, notset)))
+ del dic[name]
+
+ def setenv(self, name, value, prepend=None):
+ """ set environment variable ``name`` to ``value``. if ``prepend``
+ is a character, read the current environment variable value
+ and prepend the ``value`` adjoined with the ``prepend`` character."""
+ value = str(value)
+ if prepend and name in os.environ:
+ value = value + prepend + os.environ[name]
+ self.setitem(os.environ, name, value)
+
+ def delenv(self, name, raising=True):
+ """ delete ``name`` from environment, raise KeyError it not exists."""
+ self.delitem(os.environ, name, raising=raising)
+
+ def syspath_prepend(self, path):
+ """ prepend ``path`` to ``sys.path`` list of import locations. """
+ if not hasattr(self, '_savesyspath'):
+ self._savesyspath = sys.path[:]
+ sys.path.insert(0, str(path))
+
+ def undo(self):
+ """ undo previous changes. This call consumes the
+ undo stack. Calling it a second time has no effect unless
+ you do more monkeypatching after the undo call."""
+ for obj, name, value in self._setattr:
+ if value is not notset:
+ setattr(obj, name, value)
+ else:
+ delattr(obj, name)
+ self._setattr[:] = []
+ for dictionary, name, value in self._setitem:
+ if value is notset:
+ del dictionary[name]
+ else:
+ dictionary[name] = value
+ self._setitem[:] = []
+ if hasattr(self, '_savesyspath'):
+ sys.path[:] = self._savesyspath
diff --git a/_pytest/nose.py b/_pytest/nose.py
new file mode 100644
index 0000000000..2f90a93f8d
--- /dev/null
+++ b/_pytest/nose.py
@@ -0,0 +1,47 @@
+""" run test suites written for nose. """
+
+import pytest, py
+import inspect
+import sys
+
+def pytest_runtest_makereport(__multicall__, item, call):
+ SkipTest = getattr(sys.modules.get('nose', None), 'SkipTest', None)
+ if SkipTest:
+ if call.excinfo and call.excinfo.errisinstance(SkipTest):
+ # let's substitute the excinfo with a py.test.skip one
+ call2 = call.__class__(lambda: py.test.skip(str(call.excinfo.value)), call.when)
+ call.excinfo = call2.excinfo
+
+
+def pytest_runtest_setup(item):
+ if isinstance(item, (pytest.Function)):
+ if isinstance(item.parent, pytest.Generator):
+ gen = item.parent
+ if not hasattr(gen, '_nosegensetup'):
+ call_optional(gen.obj, 'setup')
+ if isinstance(gen.parent, pytest.Instance):
+ call_optional(gen.parent.obj, 'setup')
+ gen._nosegensetup = True
+ if not call_optional(item.obj, 'setup'):
+ # call module level setup if there is no object level one
+ call_optional(item.parent.obj, 'setup')
+
+def pytest_runtest_teardown(item):
+ if isinstance(item, pytest.Function):
+ if not call_optional(item.obj, 'teardown'):
+ call_optional(item.parent.obj, 'teardown')
+ #if hasattr(item.parent, '_nosegensetup'):
+ # #call_optional(item._nosegensetup, 'teardown')
+ # del item.parent._nosegensetup
+
+def pytest_make_collect_report(collector):
+ if isinstance(collector, pytest.Generator):
+ call_optional(collector.obj, 'setup')
+
+def call_optional(obj, name):
+ method = getattr(obj, name, None)
+ if method:
+ # If there's any problems allow the exception to raise rather than
+ # silently ignoring them
+ method()
+ return True
diff --git a/_pytest/pastebin.py b/_pytest/pastebin.py
new file mode 100644
index 0000000000..52d7a65e60
--- /dev/null
+++ b/_pytest/pastebin.py
@@ -0,0 +1,63 @@
+""" submit failure or test session information to a pastebin service. """
+import py, sys
+
+class url:
+ base = "http://paste.pocoo.org"
+ xmlrpc = base + "/xmlrpc/"
+ show = base + "/show/"
+
+def pytest_addoption(parser):
+ group = parser.getgroup("terminal reporting")
+ group._addoption('--pastebin', metavar="mode",
+ action='store', dest="pastebin", default=None,
+ type="choice", choices=['failed', 'all'],
+ help="send failed|all info to Pocoo pastebin service.")
+
+def pytest_configure(__multicall__, config):
+ import tempfile
+ __multicall__.execute()
+ if config.option.pastebin == "all":
+ config._pastebinfile = tempfile.TemporaryFile('w+')
+ tr = config.pluginmanager.getplugin('terminalreporter')
+ oldwrite = tr._tw.write
+ def tee_write(s, **kwargs):
+ oldwrite(s, **kwargs)
+ config._pastebinfile.write(str(s))
+ tr._tw.write = tee_write
+
+def pytest_unconfigure(config):
+ if hasattr(config, '_pastebinfile'):
+ config._pastebinfile.seek(0)
+ sessionlog = config._pastebinfile.read()
+ config._pastebinfile.close()
+ del config._pastebinfile
+ proxyid = getproxy().newPaste("python", sessionlog)
+ pastebinurl = "%s%s" % (url.show, proxyid)
+ sys.stderr.write("pastebin session-log: %s\n" % pastebinurl)
+ tr = config.pluginmanager.getplugin('terminalreporter')
+ del tr._tw.__dict__['write']
+
+def getproxy():
+ return py.std.xmlrpclib.ServerProxy(url.xmlrpc).pastes
+
+def pytest_terminal_summary(terminalreporter):
+ if terminalreporter.config.option.pastebin != "failed":
+ return
+ tr = terminalreporter
+ if 'failed' in tr.stats:
+ terminalreporter.write_sep("=", "Sending information to Paste Service")
+ if tr.config.option.debug:
+ terminalreporter.write_line("xmlrpcurl: %s" %(url.xmlrpc,))
+ serverproxy = getproxy()
+ for rep in terminalreporter.stats.get('failed'):
+ try:
+ msg = rep.longrepr.reprtraceback.reprentries[-1].reprfileloc
+ except AttributeError:
+ msg = tr._getfailureheadline(rep)
+ tw = py.io.TerminalWriter(stringio=True)
+ rep.toterminal(tw)
+ s = tw.stringio.getvalue()
+ assert len(s)
+ proxyid = serverproxy.newPaste("python", s)
+ pastebinurl = "%s%s" % (url.show, proxyid)
+ tr.write_line("%s --> %s" %(msg, pastebinurl))
diff --git a/_pytest/pdb.py b/_pytest/pdb.py
new file mode 100644
index 0000000000..507f93d7a3
--- /dev/null
+++ b/_pytest/pdb.py
@@ -0,0 +1,76 @@
+""" interactive debugging with PDB, the Python Debugger. """
+
+import pytest, py
+import sys
+
+def pytest_addoption(parser):
+ group = parser.getgroup("general")
+ group._addoption('--pdb',
+ action="store_true", dest="usepdb", default=False,
+ help="start the interactive Python debugger on errors.")
+
+def pytest_namespace():
+ return {'set_trace': pytestPDB().set_trace}
+
+def pytest_configure(config):
+ if config.getvalue("usepdb"):
+ config.pluginmanager.register(PdbInvoke(), 'pdbinvoke')
+
+class pytestPDB:
+ """ Pseudo PDB that defers to the real pdb. """
+ item = None
+
+ def set_trace(self):
+ """ invoke PDB set_trace debugging, dropping any IO capturing. """
+ frame = sys._getframe().f_back
+ item = getattr(self, 'item', None)
+ if item is not None:
+ capman = item.config.pluginmanager.getplugin("capturemanager")
+ out, err = capman.suspendcapture()
+ if hasattr(item, 'outerr'):
+ item.outerr = (item.outerr[0] + out, item.outerr[1] + err)
+ tw = py.io.TerminalWriter()
+ tw.line()
+ tw.sep(">", "PDB set_trace (IO-capturing turned off)")
+ py.std.pdb.Pdb().set_trace(frame)
+
+def pdbitem(item):
+ pytestPDB.item = item
+pytest_runtest_setup = pytest_runtest_call = pytest_runtest_teardown = pdbitem
+
+def pytest_runtest_makereport():
+ pytestPDB.item = None
+
+class PdbInvoke:
+ @pytest.mark.tryfirst
+ def pytest_runtest_makereport(self, item, call, __multicall__):
+ rep = __multicall__.execute()
+ if not call.excinfo or \
+ call.excinfo.errisinstance(pytest.skip.Exception) or \
+ call.excinfo.errisinstance(py.std.bdb.BdbQuit):
+ return rep
+ if "xfail" in rep.keywords:
+ return rep
+ # we assume that the above execute() suspended capturing
+ tw = py.io.TerminalWriter()
+ tw.line()
+ tw.sep(">", "traceback")
+ rep.toterminal(tw)
+ tw.sep(">", "entering PDB")
+ post_mortem(call.excinfo._excinfo[2])
+ rep._pdbshown = True
+ return rep
+
+def post_mortem(t):
+ pdb = py.std.pdb
+ class Pdb(pdb.Pdb):
+ def get_stack(self, f, t):
+ stack, i = pdb.Pdb.get_stack(self, f, t)
+ if f is None:
+ i = max(0, len(stack) - 1)
+ while i and stack[i][0].f_locals.get("__tracebackhide__", False):
+ i-=1
+ return stack, i
+ p = Pdb()
+ p.reset()
+ p.interaction(None, t)
diff --git a/_pytest/pytester.py b/_pytest/pytester.py
new file mode 100644
index 0000000000..f9da97b188
--- /dev/null
+++ b/_pytest/pytester.py
@@ -0,0 +1,674 @@
+""" (disabled by default) support for testing py.test and py.test plugins. """
+
+import py, pytest
+import sys, os
+import re
+import inspect
+import time
+from fnmatch import fnmatch
+from _pytest.main import Session
+from py.builtin import print_
+from _pytest.core import HookRelay
+
+def pytest_addoption(parser):
+ group = parser.getgroup("pylib")
+ group.addoption('--no-tools-on-path',
+ action="store_true", dest="notoolsonpath", default=False,
+ help=("discover tools on PATH instead of going through py.cmdline.")
+ )
+
+def pytest_configure(config):
+ # This might be called multiple times. Only take the first.
+ global _pytest_fullpath
+ import pytest
+ try:
+ _pytest_fullpath
+ except NameError:
+ _pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc"))
+
+def pytest_funcarg___pytest(request):
+ return PytestArg(request)
+
+class PytestArg:
+ def __init__(self, request):
+ self.request = request
+
+ def gethookrecorder(self, hook):
+ hookrecorder = HookRecorder(hook._pm)
+ hookrecorder.start_recording(hook._hookspecs)
+ self.request.addfinalizer(hookrecorder.finish_recording)
+ return hookrecorder
+
+class ParsedCall:
+ def __init__(self, name, locals):
+ assert '_name' not in locals
+ self.__dict__.update(locals)
+ self.__dict__.pop('self')
+ self._name = name
+
+ def __repr__(self):
+ d = self.__dict__.copy()
+ del d['_name']
+ return "<ParsedCall %r(**%r)>" %(self._name, d)
+
+class HookRecorder:
+ def __init__(self, pluginmanager):
+ self._pluginmanager = pluginmanager
+ self.calls = []
+ self._recorders = {}
+
+ def start_recording(self, hookspecs):
+ if not isinstance(hookspecs, (list, tuple)):
+ hookspecs = [hookspecs]
+ for hookspec in hookspecs:
+ assert hookspec not in self._recorders
+ class RecordCalls:
+ _recorder = self
+ for name, method in vars(hookspec).items():
+ if name[0] != "_":
+ setattr(RecordCalls, name, self._makecallparser(method))
+ recorder = RecordCalls()
+ self._recorders[hookspec] = recorder
+ self._pluginmanager.register(recorder)
+ self.hook = HookRelay(hookspecs, pm=self._pluginmanager,
+ prefix="pytest_")
+
+ def finish_recording(self):
+ for recorder in self._recorders.values():
+ self._pluginmanager.unregister(recorder)
+ self._recorders.clear()
+
+ def _makecallparser(self, method):
+ name = method.__name__
+ args, varargs, varkw, default = py.std.inspect.getargspec(method)
+ if not args or args[0] != "self":
+ args.insert(0, 'self')
+ fspec = py.std.inspect.formatargspec(args, varargs, varkw, default)
+ # we use exec because we want to have early type
+ # errors on wrong input arguments, using
+ # *args/**kwargs delays this and gives errors
+ # elsewhere
+ exec (py.code.compile("""
+ def %(name)s%(fspec)s:
+ self._recorder.calls.append(
+ ParsedCall(%(name)r, locals()))
+ """ % locals()))
+ return locals()[name]
+
+ def getcalls(self, names):
+ if isinstance(names, str):
+ names = names.split()
+ for name in names:
+ for cls in self._recorders:
+ if name in vars(cls):
+ break
+ else:
+ raise ValueError("callname %r not found in %r" %(
+ name, self._recorders.keys()))
+ l = []
+ for call in self.calls:
+ if call._name in names:
+ l.append(call)
+ return l
+
+ def contains(self, entries):
+ __tracebackhide__ = True
+ from py.builtin import print_
+ i = 0
+ entries = list(entries)
+ backlocals = py.std.sys._getframe(1).f_locals
+ while entries:
+ name, check = entries.pop(0)
+ for ind, call in enumerate(self.calls[i:]):
+ if call._name == name:
+ print_("NAMEMATCH", name, call)
+ if eval(check, backlocals, call.__dict__):
+ print_("CHECKERMATCH", repr(check), "->", call)
+ else:
+ print_("NOCHECKERMATCH", repr(check), "-", call)
+ continue
+ i += ind + 1
+ break
+ print_("NONAMEMATCH", name, "with", call)
+ else:
+ py.test.fail("could not find %r check %r" % (name, check))
+
+ def popcall(self, name):
+ __tracebackhide__ = True
+ for i, call in enumerate(self.calls):
+ if call._name == name:
+ del self.calls[i]
+ return call
+ lines = ["could not find call %r, in:" % (name,)]
+ lines.extend([" %s" % str(x) for x in self.calls])
+ py.test.fail("\n".join(lines))
+
+ def getcall(self, name):
+ l = self.getcalls(name)
+ assert len(l) == 1, (name, l)
+ return l[0]
+
+
+def pytest_funcarg__linecomp(request):
+ return LineComp()
+
+def pytest_funcarg__LineMatcher(request):
+ return LineMatcher
+
+def pytest_funcarg__testdir(request):
+ tmptestdir = TmpTestdir(request)
+ return tmptestdir
+
+rex_outcome = re.compile("(\d+) (\w+)")
+class RunResult:
+ def __init__(self, ret, outlines, errlines, duration):
+ self.ret = ret
+ self.outlines = outlines
+ self.errlines = errlines
+ self.stdout = LineMatcher(outlines)
+ self.stderr = LineMatcher(errlines)
+ self.duration = duration
+
+ def parseoutcomes(self):
+ for line in reversed(self.outlines):
+ if 'seconds' in line:
+ outcomes = rex_outcome.findall(line)
+ if outcomes:
+ d = {}
+ for num, cat in outcomes:
+ d[cat] = int(num)
+ return d
+
+class TmpTestdir:
+ def __init__(self, request):
+ self.request = request
+ self.Config = request.config.__class__
+ self._pytest = request.getfuncargvalue("_pytest")
+ # XXX remove duplication with tmpdir plugin
+ basetmp = request.config._tmpdirhandler.ensuretemp("testdir")
+ name = request.function.__name__
+ for i in range(100):
+ try:
+ tmpdir = basetmp.mkdir(name + str(i))
+ except py.error.EEXIST:
+ continue
+ break
+ # we need to create another subdir
+ # because Directory.collect() currently loads
+ # conftest.py from sibling directories
+ self.tmpdir = tmpdir.mkdir(name)
+ self.plugins = []
+ self._syspathremove = []
+ self.chdir() # always chdir
+ self.request.addfinalizer(self.finalize)
+
+ def __repr__(self):
+ return "<TmpTestdir %r>" % (self.tmpdir,)
+
+ def finalize(self):
+ for p in self._syspathremove:
+ py.std.sys.path.remove(p)
+ if hasattr(self, '_olddir'):
+ self._olddir.chdir()
+ # delete modules that have been loaded from tmpdir
+ for name, mod in list(sys.modules.items()):
+ if mod:
+ fn = getattr(mod, '__file__', None)
+ if fn and fn.startswith(str(self.tmpdir)):
+ del sys.modules[name]
+
+ def getreportrecorder(self, obj):
+ if hasattr(obj, 'config'):
+ obj = obj.config
+ if hasattr(obj, 'hook'):
+ obj = obj.hook
+ assert hasattr(obj, '_hookspecs'), obj
+ reprec = ReportRecorder(obj)
+ reprec.hookrecorder = self._pytest.gethookrecorder(obj)
+ reprec.hook = reprec.hookrecorder.hook
+ return reprec
+
+ def chdir(self):
+ old = self.tmpdir.chdir()
+ if not hasattr(self, '_olddir'):
+ self._olddir = old
+
+ def _makefile(self, ext, args, kwargs):
+ items = list(kwargs.items())
+ if args:
+ source = "\n".join(map(str, args)) + "\n"
+ basename = self.request.function.__name__
+ items.insert(0, (basename, source))
+ ret = None
+ for name, value in items:
+ p = self.tmpdir.join(name).new(ext=ext)
+ source = str(py.code.Source(value)).lstrip()
+ p.write(source.encode("utf-8"), "wb")
+ if ret is None:
+ ret = p
+ return ret
+
+
+ def makefile(self, ext, *args, **kwargs):
+ return self._makefile(ext, args, kwargs)
+
+ def makeini(self, source):
+ return self.makefile('cfg', setup=source)
+
+ def makeconftest(self, source):
+ return self.makepyfile(conftest=source)
+
+ def makeini(self, source):
+ return self.makefile('.ini', tox=source)
+
+ def getinicfg(self, source):
+ p = self.makeini(source)
+ return py.iniconfig.IniConfig(p)['pytest']
+
+ def makepyfile(self, *args, **kwargs):
+ return self._makefile('.py', args, kwargs)
+
+ def maketxtfile(self, *args, **kwargs):
+ return self._makefile('.txt', args, kwargs)
+
+ def syspathinsert(self, path=None):
+ if path is None:
+ path = self.tmpdir
+ py.std.sys.path.insert(0, str(path))
+ self._syspathremove.append(str(path))
+
+ def mkdir(self, name):
+ return self.tmpdir.mkdir(name)
+
+ def mkpydir(self, name):
+ p = self.mkdir(name)
+ p.ensure("__init__.py")
+ return p
+
+ Session = Session
+ def getnode(self, config, arg):
+ session = Session(config)
+ assert '::' not in str(arg)
+ p = py.path.local(arg)
+ x = session.fspath.bestrelpath(p)
+ return session.perform_collect([x], genitems=False)[0]
+
+ def getpathnode(self, path):
+ config = self.parseconfig(path)
+ session = Session(config)
+ x = session.fspath.bestrelpath(path)
+ return session.perform_collect([x], genitems=False)[0]
+
+ def genitems(self, colitems):
+ session = colitems[0].session
+ result = []
+ for colitem in colitems:
+ result.extend(session.genitems(colitem))
+ return result
+
+ def inline_genitems(self, *args):
+ #config = self.parseconfig(*args)
+ config = self.parseconfigure(*args)
+ rec = self.getreportrecorder(config)
+ session = Session(config)
+ session.perform_collect()
+ return session.items, rec
+
+ def runitem(self, source):
+ # used from runner functional tests
+ item = self.getitem(source)
+ # the test class where we are called from wants to provide the runner
+ testclassinstance = py.builtin._getimself(self.request.function)
+ runner = testclassinstance.getrunner()
+ return runner(item)
+
+ def inline_runsource(self, source, *cmdlineargs):
+ p = self.makepyfile(source)
+ l = list(cmdlineargs) + [p]
+ return self.inline_run(*l)
+
+ def inline_runsource1(self, *args):
+ args = list(args)
+ source = args.pop()
+ p = self.makepyfile(source)
+ l = list(args) + [p]
+ reprec = self.inline_run(*l)
+ reports = reprec.getreports("pytest_runtest_logreport")
+ assert len(reports) == 1, reports
+ return reports[0]
+
+ def inline_run(self, *args):
+ args = ("-s", ) + args # otherwise FD leakage
+ config = self.parseconfig(*args)
+ reprec = self.getreportrecorder(config)
+ #config.pluginmanager.do_configure(config)
+ config.hook.pytest_cmdline_main(config=config)
+ #config.pluginmanager.do_unconfigure(config)
+ return reprec
+
+ def config_preparse(self):
+ config = self.Config()
+ for plugin in self.plugins:
+ if isinstance(plugin, str):
+ config.pluginmanager.import_plugin(plugin)
+ else:
+ if isinstance(plugin, dict):
+ plugin = PseudoPlugin(plugin)
+ if not config.pluginmanager.isregistered(plugin):
+ config.pluginmanager.register(plugin)
+ return config
+
+ def parseconfig(self, *args):
+ if not args:
+ args = (self.tmpdir,)
+ config = self.config_preparse()
+ args = list(args)
+ for x in args:
+ if str(x).startswith('--basetemp'):
+ break
+ else:
+ args.append("--basetemp=%s" % self.tmpdir.dirpath('basetemp'))
+ config.parse(args)
+ return config
+
+ def reparseconfig(self, args=None):
+ """ this is used from tests that want to re-invoke parse(). """
+ if not args:
+ args = [self.tmpdir]
+ oldconfig = getattr(py.test, 'config', None)
+ try:
+ c = py.test.config = self.Config()
+ c.basetemp = py.path.local.make_numbered_dir(prefix="reparse",
+ keep=0, rootdir=self.tmpdir, lock_timeout=None)
+ c.parse(args)
+ return c
+ finally:
+ py.test.config = oldconfig
+
+ def parseconfigure(self, *args):
+ config = self.parseconfig(*args)
+ config.pluginmanager.do_configure(config)
+ self.request.addfinalizer(lambda:
+ config.pluginmanager.do_unconfigure(config))
+ return config
+
+ def getitem(self, source, funcname="test_func"):
+ for item in self.getitems(source):
+ if item.name == funcname:
+ return item
+ assert 0, "%r item not found in module:\n%s" %(funcname, source)
+
+ def getitems(self, source):
+ modcol = self.getmodulecol(source)
+ return self.genitems([modcol])
+
+ def getmodulecol(self, source, configargs=(), withinit=False):
+ kw = {self.request.function.__name__: py.code.Source(source).strip()}
+ path = self.makepyfile(**kw)
+ if withinit:
+ self.makepyfile(__init__ = "#")
+ self.config = config = self.parseconfigure(path, *configargs)
+ node = self.getnode(config, path)
+ #config.pluginmanager.do_unconfigure(config)
+ return node
+
+ def collect_by_name(self, modcol, name):
+ for colitem in modcol._memocollect():
+ if colitem.name == name:
+ return colitem
+
+ def popen(self, cmdargs, stdout, stderr, **kw):
+ env = os.environ.copy()
+ env['PYTHONPATH'] = os.pathsep.join(filter(None, [
+ str(os.getcwd()), env.get('PYTHONPATH', '')]))
+ kw['env'] = env
+ #print "env", env
+ return py.std.subprocess.Popen(cmdargs, stdout=stdout, stderr=stderr, **kw)
+
+ def pytestmain(self, *args, **kwargs):
+ ret = pytest.main(*args, **kwargs)
+ if ret == 2:
+ raise KeyboardInterrupt()
+ def run(self, *cmdargs):
+ return self._run(*cmdargs)
+
+ def _run(self, *cmdargs):
+ cmdargs = [str(x) for x in cmdargs]
+ p1 = self.tmpdir.join("stdout")
+ p2 = self.tmpdir.join("stderr")
+ print_("running", cmdargs, "curdir=", py.path.local())
+ f1 = p1.open("wb")
+ f2 = p2.open("wb")
+ now = time.time()
+ popen = self.popen(cmdargs, stdout=f1, stderr=f2,
+ close_fds=(sys.platform != "win32"))
+ ret = popen.wait()
+ f1.close()
+ f2.close()
+ out = p1.read("rb")
+ out = getdecoded(out).splitlines()
+ err = p2.read("rb")
+ err = getdecoded(err).splitlines()
+ def dump_lines(lines, fp):
+ try:
+ for line in lines:
+ py.builtin.print_(line, file=fp)
+ except UnicodeEncodeError:
+ print("couldn't print to %s because of encoding" % (fp,))
+ dump_lines(out, sys.stdout)
+ dump_lines(err, sys.stderr)
+ return RunResult(ret, out, err, time.time()-now)
+
+ def runpybin(self, scriptname, *args):
+ fullargs = self._getpybinargs(scriptname) + args
+ return self.run(*fullargs)
+
+ def _getpybinargs(self, scriptname):
+ if not self.request.config.getvalue("notoolsonpath"):
+ # XXX we rely on script refering to the correct environment
+ # we cannot use "(py.std.sys.executable,script)"
+ # becaue on windows the script is e.g. a py.test.exe
+ return (py.std.sys.executable, _pytest_fullpath,)
+ else:
+ py.test.skip("cannot run %r with --no-tools-on-path" % scriptname)
+
+ def runpython(self, script, prepend=True):
+ if prepend:
+ s = self._getsysprepend()
+ if s:
+ script.write(s + "\n" + script.read())
+ return self.run(sys.executable, script)
+
+ def _getsysprepend(self):
+ if self.request.config.getvalue("notoolsonpath"):
+ s = "import sys;sys.path.insert(0,%r);" % str(py._pydir.dirpath())
+ else:
+ s = ""
+ return s
+
+ def runpython_c(self, command):
+ command = self._getsysprepend() + command
+ return self.run(py.std.sys.executable, "-c", command)
+
+ def runpytest(self, *args):
+ p = py.path.local.make_numbered_dir(prefix="runpytest-",
+ keep=None, rootdir=self.tmpdir)
+ args = ('--basetemp=%s' % p, ) + args
+ #for x in args:
+ # if '--confcutdir' in str(x):
+ # break
+ #else:
+ # pass
+ # args = ('--confcutdir=.',) + args
+ plugins = [x for x in self.plugins if isinstance(x, str)]
+ if plugins:
+ args = ('-p', plugins[0]) + args
+ return self.runpybin("py.test", *args)
+
+ def spawn_pytest(self, string, expect_timeout=10.0):
+ if self.request.config.getvalue("notoolsonpath"):
+ py.test.skip("--no-tools-on-path prevents running pexpect-spawn tests")
+ basetemp = self.tmpdir.mkdir("pexpect")
+ invoke = " ".join(map(str, self._getpybinargs("py.test")))
+ cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string)
+ return self.spawn(cmd, expect_timeout=expect_timeout)
+
+ def spawn(self, cmd, expect_timeout=10.0):
+ pexpect = py.test.importorskip("pexpect", "2.4")
+ if hasattr(sys, 'pypy_version_info') and '64' in py.std.platform.machine():
+ pytest.skip("pypy-64 bit not supported")
+ logfile = self.tmpdir.join("spawn.out")
+ child = pexpect.spawn(cmd, logfile=logfile.open("w"))
+ child.timeout = expect_timeout
+ return child
+
+def getdecoded(out):
+ try:
+ return out.decode("utf-8")
+ except UnicodeDecodeError:
+ return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (
+ py.io.saferepr(out),)
+
+class PseudoPlugin:
+ def __init__(self, vars):
+ self.__dict__.update(vars)
+
+class ReportRecorder(object):
+ def __init__(self, hook):
+ self.hook = hook
+ self.pluginmanager = hook._pm
+ self.pluginmanager.register(self)
+
+ def getcall(self, name):
+ return self.hookrecorder.getcall(name)
+
+ def popcall(self, name):
+ return self.hookrecorder.popcall(name)
+
+ def getcalls(self, names):
+ """ return list of ParsedCall instances matching the given eventname. """
+ return self.hookrecorder.getcalls(names)
+
+ # functionality for test reports
+
+ def getreports(self, names="pytest_runtest_logreport pytest_collectreport"):
+ return [x.report for x in self.getcalls(names)]
+
+ def matchreport(self, inamepart="", names="pytest_runtest_logreport pytest_collectreport", when=None):
+ """ return a testreport whose dotted import path matches """
+ l = []
+ for rep in self.getreports(names=names):
+ if when and getattr(rep, 'when', None) != when:
+ continue
+ if not inamepart or inamepart in rep.nodeid.split("::"):
+ l.append(rep)
+ if not l:
+ raise ValueError("could not find test report matching %r: no test reports at all!" %
+ (inamepart,))
+ if len(l) > 1:
+ raise ValueError("found more than one testreport matching %r: %s" %(
+ inamepart, l))
+ return l[0]
+
+ def getfailures(self, names='pytest_runtest_logreport pytest_collectreport'):
+ return [rep for rep in self.getreports(names) if rep.failed]
+
+ def getfailedcollections(self):
+ return self.getfailures('pytest_collectreport')
+
+ def listoutcomes(self):
+ passed = []
+ skipped = []
+ failed = []
+ for rep in self.getreports("pytest_runtest_logreport"):
+ if rep.passed:
+ if rep.when == "call":
+ passed.append(rep)
+ elif rep.skipped:
+ skipped.append(rep)
+ elif rep.failed:
+ failed.append(rep)
+ return passed, skipped, failed
+
+ def countoutcomes(self):
+ return [len(x) for x in self.listoutcomes()]
+
+ def assertoutcome(self, passed=0, skipped=0, failed=0):
+ realpassed, realskipped, realfailed = self.listoutcomes()
+ assert passed == len(realpassed)
+ assert skipped == len(realskipped)
+ assert failed == len(realfailed)
+
+ def clear(self):
+ self.hookrecorder.calls[:] = []
+
+ def unregister(self):
+ self.pluginmanager.unregister(self)
+ self.hookrecorder.finish_recording()
+
+class LineComp:
+ def __init__(self):
+ self.stringio = py.io.TextIO()
+
+ def assert_contains_lines(self, lines2):
+ """ assert that lines2 are contained (linearly) in lines1.
+ return a list of extralines found.
+ """
+ __tracebackhide__ = True
+ val = self.stringio.getvalue()
+ self.stringio.truncate(0)
+ self.stringio.seek(0)
+ lines1 = val.split("\n")
+ return LineMatcher(lines1).fnmatch_lines(lines2)
+
+class LineMatcher:
+ def __init__(self, lines):
+ self.lines = lines
+
+ def str(self):
+ return "\n".join(self.lines)
+
+ def _getlines(self, lines2):
+ if isinstance(lines2, str):
+ lines2 = py.code.Source(lines2)
+ if isinstance(lines2, py.code.Source):
+ lines2 = lines2.strip().lines
+ return lines2
+
+ def fnmatch_lines_random(self, lines2):
+ lines2 = self._getlines(lines2)
+ for line in lines2:
+ for x in self.lines:
+ if line == x or fnmatch(x, line):
+ print_("matched: ", repr(line))
+ break
+ else:
+ raise ValueError("line %r not found in output" % line)
+
+ def fnmatch_lines(self, lines2):
+ def show(arg1, arg2):
+ py.builtin.print_(arg1, arg2, file=py.std.sys.stderr)
+ lines2 = self._getlines(lines2)
+ lines1 = self.lines[:]
+ nextline = None
+ extralines = []
+ __tracebackhide__ = True
+ for line in lines2:
+ nomatchprinted = False
+ while lines1:
+ nextline = lines1.pop(0)
+ if line == nextline:
+ show("exact match:", repr(line))
+ break
+ elif fnmatch(nextline, line):
+ show("fnmatch:", repr(line))
+ show(" with:", repr(nextline))
+ break
+ else:
+ if not nomatchprinted:
+ show("nomatch:", repr(line))
+ nomatchprinted = True
+ show(" and:", repr(nextline))
+ extralines.append(nextline)
+ else:
+ py.test.fail("remains unmatched: %r, see stderr" % (line,))
diff --git a/_pytest/python.py b/_pytest/python.py
new file mode 100644
index 0000000000..a2b7bb35d1
--- /dev/null
+++ b/_pytest/python.py
@@ -0,0 +1,855 @@
+""" Python test discovery, setup and run of test functions. """
+import py
+import inspect
+import sys
+import pytest
+from py._code.code import TerminalRepr
+
+import _pytest
+cutdir = py.path.local(_pytest.__file__).dirpath()
+
+def pytest_addoption(parser):
+ group = parser.getgroup("general")
+ group.addoption('--funcargs',
+ action="store_true", dest="showfuncargs", default=False,
+ help="show available function arguments, sorted by plugin")
+ parser.addini("python_files", type="args",
+ default=('test_*.py', '*_test.py'),
+ help="glob-style file patterns for Python test module discovery")
+ parser.addini("python_classes", type="args", default=("Test",),
+ help="prefixes for Python test class discovery")
+ parser.addini("python_functions", type="args", default=("test",),
+ help="prefixes for Python test function and method discovery")
+
+def pytest_cmdline_main(config):
+ if config.option.showfuncargs:
+ showfuncargs(config)
+ return 0
+
+@pytest.mark.trylast
+def pytest_namespace():
+ raises.Exception = pytest.fail.Exception
+ return {
+ 'raises' : raises,
+ 'collect': {
+ 'Module': Module, 'Class': Class, 'Instance': Instance,
+ 'Function': Function, 'Generator': Generator,
+ '_fillfuncargs': fillfuncargs}
+ }
+
+def pytest_funcarg__pytestconfig(request):
+ """ the pytest config object with access to command line opts."""
+ return request.config
+
+def pytest_pyfunc_call(__multicall__, pyfuncitem):
+ if not __multicall__.execute():
+ testfunction = pyfuncitem.obj
+ if pyfuncitem._isyieldedfunction():
+ testfunction(*pyfuncitem._args)
+ else:
+ funcargs = pyfuncitem.funcargs
+ testfunction(**funcargs)
+
+def pytest_collect_file(path, parent):
+ ext = path.ext
+ pb = path.purebasename
+ if ext == ".py":
+ if not parent.session.isinitpath(path):
+ for pat in parent.config.getini('python_files'):
+ if path.fnmatch(pat):
+ break
+ else:
+ return
+ return parent.ihook.pytest_pycollect_makemodule(
+ path=path, parent=parent)
+
+def pytest_pycollect_makemodule(path, parent):
+ return Module(path, parent)
+
+def pytest_pycollect_makeitem(__multicall__, collector, name, obj):
+ res = __multicall__.execute()
+ if res is not None:
+ return res
+ if collector._istestclasscandidate(name, obj):
+ #if hasattr(collector.obj, 'unittest'):
+ # return # we assume it's a mixin class for a TestCase derived one
+ return collector.Class(name, parent=collector)
+ elif collector.funcnamefilter(name) and hasattr(obj, '__call__'):
+ if is_generator(obj):
+ return Generator(name, parent=collector)
+ else:
+ return collector._genfunctions(name, obj)
+
+def is_generator(func):
+ try:
+ return py.code.getrawcode(func).co_flags & 32 # generator function
+ except AttributeError: # builtin functions have no bytecode
+ # assume them to not be generators
+ return False
+
+class PyobjMixin(object):
+ def obj():
+ def fget(self):
+ try:
+ return self._obj
+ except AttributeError:
+ self._obj = obj = self._getobj()
+ return obj
+ def fset(self, value):
+ self._obj = value
+ return property(fget, fset, None, "underlying python object")
+ obj = obj()
+
+ def _getobj(self):
+ return getattr(self.parent.obj, self.name)
+
+ def getmodpath(self, stopatmodule=True, includemodule=False):
+ """ return python path relative to the containing module. """
+ chain = self.listchain()
+ chain.reverse()
+ parts = []
+ for node in chain:
+ if isinstance(node, Instance):
+ continue
+ name = node.name
+ if isinstance(node, Module):
+ assert name.endswith(".py")
+ name = name[:-3]
+ if stopatmodule:
+ if includemodule:
+ parts.append(name)
+ break
+ parts.append(name)
+ parts.reverse()
+ s = ".".join(parts)
+ return s.replace(".[", "[")
+
+ def _getfslineno(self):
+ try:
+ return self._fslineno
+ except AttributeError:
+ pass
+ obj = self.obj
+ # xxx let decorators etc specify a sane ordering
+ if hasattr(obj, 'place_as'):
+ obj = obj.place_as
+
+ self._fslineno = py.code.getfslineno(obj)
+ return self._fslineno
+
+ def reportinfo(self):
+ # XXX caching?
+ obj = self.obj
+ if hasattr(obj, 'compat_co_firstlineno'):
+ # nose compatibility
+ fspath = sys.modules[obj.__module__].__file__
+ if fspath.endswith(".pyc"):
+ fspath = fspath[:-1]
+ #assert 0
+ #fn = inspect.getsourcefile(obj) or inspect.getfile(obj)
+ lineno = obj.compat_co_firstlineno
+ modpath = obj.__module__
+ else:
+ fspath, lineno = self._getfslineno()
+ modpath = self.getmodpath()
+ return fspath, lineno, modpath
+
+class PyCollectorMixin(PyobjMixin, pytest.Collector):
+
+ def funcnamefilter(self, name):
+ for prefix in self.config.getini("python_functions"):
+ if name.startswith(prefix):
+ return True
+
+ def classnamefilter(self, name):
+ for prefix in self.config.getini("python_classes"):
+ if name.startswith(prefix):
+ return True
+
+ def collect(self):
+ # NB. we avoid random getattrs and peek in the __dict__ instead
+ # (XXX originally introduced from a PyPy need, still true?)
+ dicts = [getattr(self.obj, '__dict__', {})]
+ for basecls in inspect.getmro(self.obj.__class__):
+ dicts.append(basecls.__dict__)
+ seen = {}
+ l = []
+ for dic in dicts:
+ for name, obj in dic.items():
+ if name in seen:
+ continue
+ seen[name] = True
+ if name[0] != "_":
+ res = self.makeitem(name, obj)
+ if res is None:
+ continue
+ if not isinstance(res, list):
+ res = [res]
+ l.extend(res)
+ l.sort(key=lambda item: item.reportinfo()[:2])
+ return l
+
+ def makeitem(self, name, obj):
+ return self.ihook.pytest_pycollect_makeitem(
+ collector=self, name=name, obj=obj)
+
+ def _istestclasscandidate(self, name, obj):
+ if self.classnamefilter(name) and \
+ inspect.isclass(obj):
+ if hasinit(obj):
+ # XXX WARN
+ return False
+ return True
+
+ def _genfunctions(self, name, funcobj):
+ module = self.getparent(Module).obj
+ clscol = self.getparent(Class)
+ cls = clscol and clscol.obj or None
+ metafunc = Metafunc(funcobj, config=self.config,
+ cls=cls, module=module)
+ gentesthook = self.config.hook.pytest_generate_tests
+ extra = [module]
+ if cls is not None:
+ extra.append(cls())
+ plugins = self.getplugins() + extra
+ gentesthook.pcall(plugins, metafunc=metafunc)
+ if not metafunc._calls:
+ return self.Function(name, parent=self)
+ l = []
+ for callspec in metafunc._calls:
+ subname = "%s[%s]" %(name, callspec.id)
+ function = self.Function(name=subname, parent=self,
+ callspec=callspec, callobj=funcobj, keywords={callspec.id:True})
+ l.append(function)
+ return l
+
+class Module(pytest.File, PyCollectorMixin):
+ def _getobj(self):
+ return self._memoizedcall('_obj', self._importtestmodule)
+
+ def _importtestmodule(self):
+ # we assume we are only called once per module
+ try:
+ mod = self.fspath.pyimport(ensuresyspath=True)
+ except SyntaxError:
+ excinfo = py.code.ExceptionInfo()
+ raise self.CollectError(excinfo.getrepr(style="short"))
+ except self.fspath.ImportMismatchError:
+ e = sys.exc_info()[1]
+ raise self.CollectError(
+ "import file mismatch:\n"
+ "imported module %r has this __file__ attribute:\n"
+ " %s\n"
+ "which is not the same as the test file we want to collect:\n"
+ " %s\n"
+ "HINT: use a unique basename for your test file modules"
+ % e.args
+ )
+ #print "imported test module", mod
+ self.config.pluginmanager.consider_module(mod)
+ return mod
+
+ def setup(self):
+ if hasattr(self.obj, 'setup_module'):
+ #XXX: nose compat hack, move to nose plugin
+ # if it takes a positional arg, its probably a pytest style one
+ # so we pass the current module object
+ if inspect.getargspec(self.obj.setup_module)[0]:
+ self.obj.setup_module(self.obj)
+ else:
+ self.obj.setup_module()
+
+ def teardown(self):
+ if hasattr(self.obj, 'teardown_module'):
+ #XXX: nose compat hack, move to nose plugin
+ # if it takes a positional arg, its probably a py.test style one
+ # so we pass the current module object
+ if inspect.getargspec(self.obj.teardown_module)[0]:
+ self.obj.teardown_module(self.obj)
+ else:
+ self.obj.teardown_module()
+
+class Class(PyCollectorMixin, pytest.Collector):
+
+ def collect(self):
+ return [self.Instance(name="()", parent=self)]
+
+ def setup(self):
+ setup_class = getattr(self.obj, 'setup_class', None)
+ if setup_class is not None:
+ setup_class = getattr(setup_class, 'im_func', setup_class)
+ setup_class(self.obj)
+
+ def teardown(self):
+ teardown_class = getattr(self.obj, 'teardown_class', None)
+ if teardown_class is not None:
+ teardown_class = getattr(teardown_class, 'im_func', teardown_class)
+ teardown_class(self.obj)
+
+class Instance(PyCollectorMixin, pytest.Collector):
+ def _getobj(self):
+ return self.parent.obj()
+
+ def newinstance(self):
+ self.obj = self._getobj()
+ return self.obj
+
+class FunctionMixin(PyobjMixin):
+ """ mixin for the code common to Function and Generator.
+ """
+
+ def setup(self):
+ """ perform setup for this test function. """
+ if inspect.ismethod(self.obj):
+ name = 'setup_method'
+ else:
+ name = 'setup_function'
+ if hasattr(self, '_preservedparent'):
+ obj = self._preservedparent
+ elif isinstance(self.parent, Instance):
+ obj = self.parent.newinstance()
+ self.obj = self._getobj()
+ else:
+ obj = self.parent.obj
+ setup_func_or_method = getattr(obj, name, None)
+ if setup_func_or_method is not None:
+ setup_func_or_method(self.obj)
+
+ def teardown(self):
+ """ perform teardown for this test function. """
+ if inspect.ismethod(self.obj):
+ name = 'teardown_method'
+ else:
+ name = 'teardown_function'
+ obj = self.parent.obj
+ teardown_func_or_meth = getattr(obj, name, None)
+ if teardown_func_or_meth is not None:
+ teardown_func_or_meth(self.obj)
+
+ def _prunetraceback(self, excinfo):
+ if hasattr(self, '_obj') and not self.config.option.fulltrace:
+ code = py.code.Code(self.obj)
+ path, firstlineno = code.path, code.firstlineno
+ traceback = excinfo.traceback
+ ntraceback = traceback.cut(path=path, firstlineno=firstlineno)
+ if ntraceback == traceback:
+ ntraceback = ntraceback.cut(path=path)
+ if ntraceback == traceback:
+ ntraceback = ntraceback.cut(excludepath=cutdir)
+ excinfo.traceback = ntraceback.filter()
+
+ def _repr_failure_py(self, excinfo, style="long"):
+ if excinfo.errisinstance(FuncargRequest.LookupError):
+ fspath, lineno, msg = self.reportinfo()
+ lines, _ = inspect.getsourcelines(self.obj)
+ for i, line in enumerate(lines):
+ if line.strip().startswith('def'):
+ return FuncargLookupErrorRepr(fspath, lineno,
+ lines[:i+1], str(excinfo.value))
+ if excinfo.errisinstance(pytest.fail.Exception):
+ if not excinfo.value.pytrace:
+ return str(excinfo.value)
+ return super(FunctionMixin, self)._repr_failure_py(excinfo,
+ style=style)
+
+ def repr_failure(self, excinfo, outerr=None):
+ assert outerr is None, "XXX outerr usage is deprecated"
+ return self._repr_failure_py(excinfo,
+ style=self.config.option.tbstyle)
+
+class FuncargLookupErrorRepr(TerminalRepr):
+ def __init__(self, filename, firstlineno, deflines, errorstring):
+ self.deflines = deflines
+ self.errorstring = errorstring
+ self.filename = filename
+ self.firstlineno = firstlineno
+
+ def toterminal(self, tw):
+ tw.line()
+ for line in self.deflines:
+ tw.line(" " + line.strip())
+ for line in self.errorstring.split("\n"):
+ tw.line(" " + line.strip(), red=True)
+ tw.line()
+ tw.line("%s:%d" % (self.filename, self.firstlineno+1))
+
+class Generator(FunctionMixin, PyCollectorMixin, pytest.Collector):
+ def collect(self):
+ # test generators are seen as collectors but they also
+ # invoke setup/teardown on popular request
+ # (induced by the common "test_*" naming shared with normal tests)
+ self.config._setupstate.prepare(self)
+ # see FunctionMixin.setup and test_setupstate_is_preserved_134
+ self._preservedparent = self.parent.obj
+ l = []
+ seen = {}
+ for i, x in enumerate(self.obj()):
+ name, call, args = self.getcallargs(x)
+ if not py.builtin.callable(call):
+ raise TypeError("%r yielded non callable test %r" %(self.obj, call,))
+ if name is None:
+ name = "[%d]" % i
+ else:
+ name = "['%s']" % name
+ if name in seen:
+ raise ValueError("%r generated tests with non-unique name %r" %(self, name))
+ seen[name] = True
+ l.append(self.Function(name, self, args=args, callobj=call))
+ return l
+
+ def getcallargs(self, obj):
+ if not isinstance(obj, (tuple, list)):
+ obj = (obj,)
+ # explict naming
+ if isinstance(obj[0], py.builtin._basestring):
+ name = obj[0]
+ obj = obj[1:]
+ else:
+ name = None
+ call, args = obj[0], obj[1:]
+ return name, call, args
+
+
+#
+# Test Items
+#
+_dummy = object()
+class Function(FunctionMixin, pytest.Item):
+ """ a Function Item is responsible for setting up
+ and executing a Python callable test object.
+ """
+ _genid = None
+ def __init__(self, name, parent=None, args=None, config=None,
+ callspec=None, callobj=_dummy, keywords=None, session=None):
+ super(Function, self).__init__(name, parent,
+ config=config, session=session)
+ self._args = args
+ if self._isyieldedfunction():
+ assert not callspec, (
+ "yielded functions (deprecated) cannot have funcargs")
+ else:
+ if callspec is not None:
+ self.funcargs = callspec.funcargs or {}
+ self._genid = callspec.id
+ if hasattr(callspec, "param"):
+ self._requestparam = callspec.param
+ else:
+ self.funcargs = {}
+ if callobj is not _dummy:
+ self._obj = callobj
+ self.function = getattr(self.obj, 'im_func', self.obj)
+ self.keywords.update(py.builtin._getfuncdict(self.obj) or {})
+ if keywords:
+ self.keywords.update(keywords)
+
+ def _getobj(self):
+ name = self.name
+ i = name.find("[") # parametrization
+ if i != -1:
+ name = name[:i]
+ return getattr(self.parent.obj, name)
+
+ def _isyieldedfunction(self):
+ return self._args is not None
+
+ def runtest(self):
+ """ execute the underlying test function. """
+ self.ihook.pytest_pyfunc_call(pyfuncitem=self)
+
+ def setup(self):
+ super(Function, self).setup()
+ if hasattr(self, 'funcargs'):
+ fillfuncargs(self)
+
+ def __eq__(self, other):
+ try:
+ return (self.name == other.name and
+ self._args == other._args and
+ self.parent == other.parent and
+ self.obj == other.obj and
+ getattr(self, '_genid', None) ==
+ getattr(other, '_genid', None)
+ )
+ except AttributeError:
+ pass
+ return False
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __hash__(self):
+ return hash((self.parent, self.name))
+
+def hasinit(obj):
+ init = getattr(obj, '__init__', None)
+ if init:
+ if init != object.__init__:
+ return True
+
+
+def getfuncargnames(function):
+ # XXX merge with main.py's varnames
+ argnames = py.std.inspect.getargs(py.code.getrawcode(function))[0]
+ startindex = py.std.inspect.ismethod(function) and 1 or 0
+ defaults = getattr(function, 'func_defaults',
+ getattr(function, '__defaults__', None)) or ()
+ numdefaults = len(defaults)
+ if numdefaults:
+ return argnames[startindex:-numdefaults]
+ return argnames[startindex:]
+
+def fillfuncargs(function):
+ """ fill missing funcargs. """
+ request = FuncargRequest(pyfuncitem=function)
+ request._fillfuncargs()
+
+_notexists = object()
+class CallSpec:
+ def __init__(self, funcargs, id, param):
+ self.funcargs = funcargs
+ self.id = id
+ if param is not _notexists:
+ self.param = param
+ def __repr__(self):
+ return "<CallSpec id=%r param=%r funcargs=%r>" %(
+ self.id, getattr(self, 'param', '?'), self.funcargs)
+
+class Metafunc:
+ def __init__(self, function, config=None, cls=None, module=None):
+ self.config = config
+ self.module = module
+ self.function = function
+ self.funcargnames = getfuncargnames(function)
+ self.cls = cls
+ self.module = module
+ self._calls = []
+ self._ids = py.builtin.set()
+
+ def addcall(self, funcargs=None, id=_notexists, param=_notexists):
+ """ add a new call to the underlying test function during the
+ collection phase of a test run.
+
+ :arg funcargs: argument keyword dictionary used when invoking
+ the test function.
+
+ :arg id: used for reporting and identification purposes. If you
+ don't supply an `id` the length of the currently
+ list of calls to the test function will be used.
+
+ :arg param: will be exposed to a later funcarg factory invocation
+ through the ``request.param`` attribute. Setting it (instead of
+ directly providing a ``funcargs`` ditionary) is called
+ *indirect parametrization*. Indirect parametrization is
+ preferable if test values are expensive to setup or can
+ only be created after certain fixtures or test-run related
+ initialization code has been run.
+ """
+ assert funcargs is None or isinstance(funcargs, dict)
+ if id is None:
+ raise ValueError("id=None not allowed")
+ if id is _notexists:
+ id = len(self._calls)
+ id = str(id)
+ if id in self._ids:
+ raise ValueError("duplicate id %r" % id)
+ self._ids.add(id)
+ self._calls.append(CallSpec(funcargs, id, param))
+
+class FuncargRequest:
+ """ A request for function arguments from a test function. """
+ _argprefix = "pytest_funcarg__"
+ _argname = None
+
+ class LookupError(LookupError):
+ """ error on performing funcarg request. """
+
+ def __init__(self, pyfuncitem):
+ self._pyfuncitem = pyfuncitem
+ if hasattr(pyfuncitem, '_requestparam'):
+ self.param = pyfuncitem._requestparam
+ extra = [obj for obj in (self.module, self.instance) if obj]
+ self._plugins = pyfuncitem.getplugins() + extra
+ self._funcargs = self._pyfuncitem.funcargs.copy()
+ self._name2factory = {}
+ self._currentarg = None
+
+ @property
+ def function(self):
+ """ function object of the test invocation. """
+ return self._pyfuncitem.obj
+
+ @property
+ def keywords(self):
+ """ keywords of the test function item.
+
+ .. versionadded:: 2.0
+ """
+ return self._pyfuncitem.keywords
+
+ @property
+ def module(self):
+ """ module where the test function was collected. """
+ return self._pyfuncitem.getparent(pytest.Module).obj
+
+ @property
+ def cls(self):
+ """ class (can be None) where the test function was collected. """
+ clscol = self._pyfuncitem.getparent(pytest.Class)
+ if clscol:
+ return clscol.obj
+ @property
+ def instance(self):
+ """ instance (can be None) on which test function was collected. """
+ return py.builtin._getimself(self.function)
+
+ @property
+ def config(self):
+ """ the pytest config object associated with this request. """
+ return self._pyfuncitem.config
+
+ @property
+ def fspath(self):
+ """ the file system path of the test module which collected this test. """
+ return self._pyfuncitem.fspath
+
+ def _fillfuncargs(self):
+ argnames = getfuncargnames(self.function)
+ if argnames:
+ assert not getattr(self._pyfuncitem, '_args', None), (
+ "yielded functions cannot have funcargs")
+ for argname in argnames:
+ if argname not in self._pyfuncitem.funcargs:
+ self._pyfuncitem.funcargs[argname] = self.getfuncargvalue(argname)
+
+
+ def applymarker(self, marker):
+ """ apply a marker to a single test function invocation.
+ This method is useful if you don't want to have a keyword/marker
+ on all function invocations.
+
+ :arg marker: a :py:class:`_pytest.mark.MarkDecorator` object
+ created by a call to ``py.test.mark.NAME(...)``.
+ """
+ if not isinstance(marker, py.test.mark.XYZ.__class__):
+ raise ValueError("%r is not a py.test.mark.* object")
+ self._pyfuncitem.keywords[marker.markname] = marker
+
+ def cached_setup(self, setup, teardown=None, scope="module", extrakey=None):
+ """ return a testing resource managed by ``setup`` &
+ ``teardown`` calls. ``scope`` and ``extrakey`` determine when the
+ ``teardown`` function will be called so that subsequent calls to
+ ``setup`` would recreate the resource.
+
+ :arg teardown: function receiving a previously setup resource.
+ :arg setup: a no-argument function creating a resource.
+ :arg scope: a string value out of ``function``, ``class``, ``module``
+ or ``session`` indicating the caching lifecycle of the resource.
+ :arg extrakey: added to internal caching key of (funcargname, scope).
+ """
+ if not hasattr(self.config, '_setupcache'):
+ self.config._setupcache = {} # XXX weakref?
+ cachekey = (self._currentarg, self._getscopeitem(scope), extrakey)
+ cache = self.config._setupcache
+ try:
+ val = cache[cachekey]
+ except KeyError:
+ val = setup()
+ cache[cachekey] = val
+ if teardown is not None:
+ def finalizer():
+ del cache[cachekey]
+ teardown(val)
+ self._addfinalizer(finalizer, scope=scope)
+ return val
+
+ def getfuncargvalue(self, argname):
+ """ Retrieve a function argument by name for this test
+ function invocation. This allows one function argument factory
+ to call another function argument factory. If there are two
+ funcarg factories for the same test function argument the first
+ factory may use ``getfuncargvalue`` to call the second one and
+ do something additional with the resource.
+ """
+ try:
+ return self._funcargs[argname]
+ except KeyError:
+ pass
+ if argname not in self._name2factory:
+ self._name2factory[argname] = self.config.pluginmanager.listattr(
+ plugins=self._plugins,
+ attrname=self._argprefix + str(argname)
+ )
+ #else: we are called recursively
+ if not self._name2factory[argname]:
+ self._raiselookupfailed(argname)
+ funcargfactory = self._name2factory[argname].pop()
+ oldarg = self._currentarg
+ self._currentarg = argname
+ try:
+ self._funcargs[argname] = res = funcargfactory(request=self)
+ finally:
+ self._currentarg = oldarg
+ return res
+
+ def _getscopeitem(self, scope):
+ if scope == "function":
+ return self._pyfuncitem
+ elif scope == "session":
+ return None
+ elif scope == "class":
+ x = self._pyfuncitem.getparent(pytest.Class)
+ if x is not None:
+ return x
+ scope = "module"
+ if scope == "module":
+ return self._pyfuncitem.getparent(pytest.Module)
+ raise ValueError("unknown finalization scope %r" %(scope,))
+
+ def addfinalizer(self, finalizer):
+ """add finalizer function to be called after test function
+ finished execution. """
+ self._addfinalizer(finalizer, scope="function")
+
+ def _addfinalizer(self, finalizer, scope):
+ colitem = self._getscopeitem(scope)
+ self.config._setupstate.addfinalizer(
+ finalizer=finalizer, colitem=colitem)
+
+ def __repr__(self):
+ return "<FuncargRequest for %r>" %(self._pyfuncitem)
+
+ def _raiselookupfailed(self, argname):
+ available = []
+ for plugin in self._plugins:
+ for name in vars(plugin):
+ if name.startswith(self._argprefix):
+ name = name[len(self._argprefix):]
+ if name not in available:
+ available.append(name)
+ fspath, lineno, msg = self._pyfuncitem.reportinfo()
+ msg = "LookupError: no factory found for function argument %r" % (argname,)
+ msg += "\n available funcargs: %s" %(", ".join(available),)
+ msg += "\n use 'py.test --funcargs [testpath]' for help on them."
+ raise self.LookupError(msg)
+
+def showfuncargs(config):
+ from _pytest.main import Session
+ session = Session(config)
+ session.perform_collect()
+ if session.items:
+ plugins = session.items[0].getplugins()
+ else:
+ plugins = session.getplugins()
+ curdir = py.path.local()
+ tw = py.io.TerminalWriter()
+ verbose = config.getvalue("verbose")
+ for plugin in plugins:
+ available = []
+ for name, factory in vars(plugin).items():
+ if name.startswith(FuncargRequest._argprefix):
+ name = name[len(FuncargRequest._argprefix):]
+ if name not in available:
+ available.append([name, factory])
+ if available:
+ pluginname = plugin.__name__
+ for name, factory in available:
+ loc = getlocation(factory, curdir)
+ if verbose:
+ funcargspec = "%s -- %s" %(name, loc,)
+ else:
+ funcargspec = name
+ tw.line(funcargspec, green=True)
+ doc = factory.__doc__ or ""
+ if doc:
+ for line in doc.split("\n"):
+ tw.line(" " + line.strip())
+ else:
+ tw.line(" %s: no docstring available" %(loc,),
+ red=True)
+
+def getlocation(function, curdir):
+ import inspect
+ fn = py.path.local(inspect.getfile(function))
+ lineno = py.builtin._getcode(function).co_firstlineno
+ if fn.relto(curdir):
+ fn = fn.relto(curdir)
+ return "%s:%d" %(fn, lineno+1)
+
+# builtin pytest.raises helper
+
+def raises(ExpectedException, *args, **kwargs):
+ """ assert that a code block/function call raises @ExpectedException
+ and raise a failure exception otherwise.
+
+ If using Python 2.5 or above, you may use this function as a
+ context manager::
+
+ >>> with raises(ZeroDivisionError):
+ ... 1/0
+
+ Or you can specify a callable by passing a to-be-called lambda::
+
+ >>> raises(ZeroDivisionError, lambda: 1/0)
+ <ExceptionInfo ...>
+
+ or you can specify an arbitrary callable with arguments::
+
+ >>> def f(x): return 1/x
+ ...
+ >>> raises(ZeroDivisionError, f, 0)
+ <ExceptionInfo ...>
+ >>> raises(ZeroDivisionError, f, x=0)
+ <ExceptionInfo ...>
+
+ A third possibility is to use a string which which will
+ be executed::
+
+ >>> raises(ZeroDivisionError, "f(0)")
+ <ExceptionInfo ...>
+ """
+ __tracebackhide__ = True
+
+ if not args:
+ return RaisesContext(ExpectedException)
+ elif isinstance(args[0], str):
+ code, = args
+ assert isinstance(code, str)
+ frame = sys._getframe(1)
+ loc = frame.f_locals.copy()
+ loc.update(kwargs)
+ #print "raises frame scope: %r" % frame.f_locals
+ try:
+ code = py.code.Source(code).compile()
+ py.builtin.exec_(code, frame.f_globals, loc)
+ # XXX didn'T mean f_globals == f_locals something special?
+ # this is destroyed here ...
+ except ExpectedException:
+ return py.code.ExceptionInfo()
+ else:
+ func = args[0]
+ try:
+ func(*args[1:], **kwargs)
+ except ExpectedException:
+ return py.code.ExceptionInfo()
+ k = ", ".join(["%s=%r" % x for x in kwargs.items()])
+ if k:
+ k = ', ' + k
+ expr = '%s(%r%s)' %(getattr(func, '__name__', func), args, k)
+ pytest.fail("DID NOT RAISE")
+
+class RaisesContext(object):
+ def __init__(self, ExpectedException):
+ self.ExpectedException = ExpectedException
+ self.excinfo = None
+
+ def __enter__(self):
+ self.excinfo = object.__new__(py.code.ExceptionInfo)
+ return self.excinfo
+
+ def __exit__(self, *tp):
+ __tracebackhide__ = True
+ if tp[0] is None:
+ pytest.fail("DID NOT RAISE")
+ self.excinfo.__init__(tp)
+ return issubclass(self.excinfo.type, self.ExpectedException)
+
diff --git a/_pytest/recwarn.py b/_pytest/recwarn.py
new file mode 100644
index 0000000000..e2fb2b17e4
--- /dev/null
+++ b/_pytest/recwarn.py
@@ -0,0 +1,96 @@
+""" recording warnings during test function execution. """
+
+import py
+import sys, os
+
+def pytest_funcarg__recwarn(request):
+ """Return a WarningsRecorder instance that provides these methods:
+
+ * ``pop(category=None)``: return last warning matching the category.
+ * ``clear()``: clear list of warnings
+ """
+ if sys.version_info >= (2,7):
+ import warnings
+ oldfilters = warnings.filters[:]
+ warnings.simplefilter('default')
+ def reset_filters():
+ warnings.filters[:] = oldfilters
+ request.addfinalizer(reset_filters)
+ wrec = WarningsRecorder()
+ request.addfinalizer(wrec.finalize)
+ return wrec
+
+def pytest_namespace():
+ return {'deprecated_call': deprecated_call}
+
+def deprecated_call(func, *args, **kwargs):
+ """ assert that calling ``func(*args, **kwargs)``
+ triggers a DeprecationWarning.
+ """
+ warningmodule = py.std.warnings
+ l = []
+ oldwarn_explicit = getattr(warningmodule, 'warn_explicit')
+ def warn_explicit(*args, **kwargs):
+ l.append(args)
+ oldwarn_explicit(*args, **kwargs)
+ oldwarn = getattr(warningmodule, 'warn')
+ def warn(*args, **kwargs):
+ l.append(args)
+ oldwarn(*args, **kwargs)
+
+ warningmodule.warn_explicit = warn_explicit
+ warningmodule.warn = warn
+ try:
+ ret = func(*args, **kwargs)
+ finally:
+ warningmodule.warn_explicit = warn_explicit
+ warningmodule.warn = warn
+ if not l:
+ #print warningmodule
+ __tracebackhide__ = True
+ raise AssertionError("%r did not produce DeprecationWarning" %(func,))
+ return ret
+
+
+class RecordedWarning:
+ def __init__(self, message, category, filename, lineno, line):
+ self.message = message
+ self.category = category
+ self.filename = filename
+ self.lineno = lineno
+ self.line = line
+
+class WarningsRecorder:
+ def __init__(self):
+ warningmodule = py.std.warnings
+ self.list = []
+ def showwarning(message, category, filename, lineno, line=0):
+ self.list.append(RecordedWarning(
+ message, category, filename, lineno, line))
+ try:
+ self.old_showwarning(message, category,
+ filename, lineno, line=line)
+ except TypeError:
+ # < python2.6
+ self.old_showwarning(message, category, filename, lineno)
+ self.old_showwarning = warningmodule.showwarning
+ warningmodule.showwarning = showwarning
+
+ def pop(self, cls=Warning):
+ """ pop the first recorded warning, raise exception if not exists."""
+ for i, w in enumerate(self.list):
+ if issubclass(w.category, cls):
+ return self.list.pop(i)
+ __tracebackhide__ = True
+ assert 0, "%r not found in %r" %(cls, self.list)
+
+ #def resetregistry(self):
+ # import warnings
+ # warnings.onceregistry.clear()
+ # warnings.__warningregistry__.clear()
+
+ def clear(self):
+ self.list[:] = []
+
+ def finalize(self):
+ py.std.warnings.showwarning = self.old_showwarning
diff --git a/_pytest/resultlog.py b/_pytest/resultlog.py
new file mode 100644
index 0000000000..7f879cce59
--- /dev/null
+++ b/_pytest/resultlog.py
@@ -0,0 +1,93 @@
+""" (disabled by default) create result information in a plain text file. """
+
+import py
+
+def pytest_addoption(parser):
+ group = parser.getgroup("terminal reporting", "resultlog plugin options")
+ group.addoption('--resultlog', action="store", dest="resultlog",
+ metavar="path", default=None,
+ help="path for machine-readable result log.")
+
+def pytest_configure(config):
+ resultlog = config.option.resultlog
+ # prevent opening resultlog on slave nodes (xdist)
+ if resultlog and not hasattr(config, 'slaveinput'):
+ logfile = open(resultlog, 'w', 1) # line buffered
+ config._resultlog = ResultLog(config, logfile)
+ config.pluginmanager.register(config._resultlog)
+
+def pytest_unconfigure(config):
+ resultlog = getattr(config, '_resultlog', None)
+ if resultlog:
+ resultlog.logfile.close()
+ del config._resultlog
+ config.pluginmanager.unregister(resultlog)
+
+def generic_path(item):
+ chain = item.listchain()
+ gpath = [chain[0].name]
+ fspath = chain[0].fspath
+ fspart = False
+ for node in chain[1:]:
+ newfspath = node.fspath
+ if newfspath == fspath:
+ if fspart:
+ gpath.append(':')
+ fspart = False
+ else:
+ gpath.append('.')
+ else:
+ gpath.append('/')
+ fspart = True
+ name = node.name
+ if name[0] in '([':
+ gpath.pop()
+ gpath.append(name)
+ fspath = newfspath
+ return ''.join(gpath)
+
+class ResultLog(object):
+ def __init__(self, config, logfile):
+ self.config = config
+ self.logfile = logfile # preferably line buffered
+
+ def write_log_entry(self, testpath, lettercode, longrepr):
+ py.builtin.print_("%s %s" % (lettercode, testpath), file=self.logfile)
+ for line in longrepr.splitlines():
+ py.builtin.print_(" %s" % line, file=self.logfile)
+
+ def log_outcome(self, report, lettercode, longrepr):
+ testpath = getattr(report, 'nodeid', None)
+ if testpath is None:
+ testpath = report.fspath
+ self.write_log_entry(testpath, lettercode, longrepr)
+
+ def pytest_runtest_logreport(self, report):
+ res = self.config.hook.pytest_report_teststatus(report=report)
+ code = res[1]
+ if code == 'x':
+ longrepr = str(report.longrepr)
+ elif code == 'X':
+ longrepr = ''
+ elif report.passed:
+ longrepr = ""
+ elif report.failed:
+ longrepr = str(report.longrepr)
+ elif report.skipped:
+ longrepr = str(report.longrepr[2])
+ self.log_outcome(report, code, longrepr)
+
+ def pytest_collectreport(self, report):
+ if not report.passed:
+ if report.failed:
+ code = "F"
+ longrepr = str(report.longrepr.reprcrash)
+ else:
+ assert report.skipped
+ code = "S"
+ longrepr = "%s:%d: %s" % report.longrepr
+ self.log_outcome(report, code, longrepr)
+
+ def pytest_internalerror(self, excrepr):
+ path = excrepr.reprcrash.path
+ self.write_log_entry(path, '!', str(excrepr))
diff --git a/_pytest/runner.py b/_pytest/runner.py
new file mode 100644
index 0000000000..08cdc96f31
--- /dev/null
+++ b/_pytest/runner.py
@@ -0,0 +1,390 @@
+""" basic collect and runtest protocol implementations """
+
+import py, sys
+from py._code.code import TerminalRepr
+
+def pytest_namespace():
+ return {
+ 'fail' : fail,
+ 'skip' : skip,
+ 'importorskip' : importorskip,
+ 'exit' : exit,
+ }
+
+#
+# pytest plugin hooks
+
+# XXX move to pytest_sessionstart and fix py.test owns tests
+def pytest_configure(config):
+ config._setupstate = SetupState()
+
+def pytest_sessionfinish(session, exitstatus):
+ if hasattr(session.config, '_setupstate'):
+ hook = session.config.hook
+ rep = hook.pytest__teardown_final(session=session)
+ if rep:
+ hook.pytest__teardown_final_logerror(session=session, report=rep)
+ session.exitstatus = 1
+
+class NodeInfo:
+ def __init__(self, location):
+ self.location = location
+
+def pytest_runtest_protocol(item):
+ item.ihook.pytest_runtest_logstart(
+ nodeid=item.nodeid, location=item.location,
+ )
+ runtestprotocol(item)
+ return True
+
+def runtestprotocol(item, log=True):
+ rep = call_and_report(item, "setup", log)
+ reports = [rep]
+ if rep.passed:
+ reports.append(call_and_report(item, "call", log))
+ reports.append(call_and_report(item, "teardown", log))
+ return reports
+
+def pytest_runtest_setup(item):
+ item.config._setupstate.prepare(item)
+
+def pytest_runtest_call(item):
+ item.runtest()
+
+def pytest_runtest_teardown(item):
+ item.config._setupstate.teardown_exact(item)
+
+def pytest__teardown_final(session):
+ call = CallInfo(session.config._setupstate.teardown_all, when="teardown")
+ if call.excinfo:
+ ntraceback = call.excinfo.traceback .cut(excludepath=py._pydir)
+ call.excinfo.traceback = ntraceback.filter()
+ longrepr = call.excinfo.getrepr(funcargs=True)
+ return TeardownErrorReport(longrepr)
+
+def pytest_report_teststatus(report):
+ if report.when in ("setup", "teardown"):
+ if report.failed:
+ # category, shortletter, verbose-word
+ return "error", "E", "ERROR"
+ elif report.skipped:
+ return "skipped", "s", "SKIPPED"
+ else:
+ return "", "", ""
+
+
+#
+# Implementation
+
+def call_and_report(item, when, log=True):
+ call = call_runtest_hook(item, when)
+ hook = item.ihook
+ report = hook.pytest_runtest_makereport(item=item, call=call)
+ if log and (when == "call" or not report.passed):
+ hook.pytest_runtest_logreport(report=report)
+ return report
+
+def call_runtest_hook(item, when):
+ hookname = "pytest_runtest_" + when
+ ihook = getattr(item.ihook, hookname)
+ return CallInfo(lambda: ihook(item=item), when=when)
+
+class CallInfo:
+ """ Result/Exception info a function invocation. """
+ #: None or ExceptionInfo object.
+ excinfo = None
+ def __init__(self, func, when):
+ #: context of invocation: one of "setup", "call",
+ #: "teardown", "memocollect"
+ self.when = when
+ try:
+ self.result = func()
+ except KeyboardInterrupt:
+ raise
+ except:
+ self.excinfo = py.code.ExceptionInfo()
+
+ def __repr__(self):
+ if self.excinfo:
+ status = "exception: %s" % str(self.excinfo.value)
+ else:
+ status = "result: %r" % (self.result,)
+ return "<CallInfo when=%r %s>" % (self.when, status)
+
+def getslaveinfoline(node):
+ try:
+ return node._slaveinfocache
+ except AttributeError:
+ d = node.slaveinfo
+ ver = "%s.%s.%s" % d['version_info'][:3]
+ node._slaveinfocache = s = "[%s] %s -- Python %s %s" % (
+ d['id'], d['sysplatform'], ver, d['executable'])
+ return s
+
+class BaseReport(object):
+ def toterminal(self, out):
+ longrepr = self.longrepr
+ if hasattr(self, 'node'):
+ out.line(getslaveinfoline(self.node))
+ if hasattr(longrepr, 'toterminal'):
+ longrepr.toterminal(out)
+ else:
+ out.line(str(longrepr))
+
+ passed = property(lambda x: x.outcome == "passed")
+ failed = property(lambda x: x.outcome == "failed")
+ skipped = property(lambda x: x.outcome == "skipped")
+
+ @property
+ def fspath(self):
+ return self.nodeid.split("::")[0]
+
+def pytest_runtest_makereport(item, call):
+ when = call.when
+ keywords = dict([(x,1) for x in item.keywords])
+ excinfo = call.excinfo
+ if not call.excinfo:
+ outcome = "passed"
+ longrepr = None
+ else:
+ excinfo = call.excinfo
+ if not isinstance(excinfo, py.code.ExceptionInfo):
+ outcome = "failed"
+ longrepr = excinfo
+ elif excinfo.errisinstance(py.test.skip.Exception):
+ outcome = "skipped"
+ r = item._repr_failure_py(excinfo, "line").reprcrash
+ longrepr = (str(r.path), r.lineno, r.message)
+ else:
+ outcome = "failed"
+ if call.when == "call":
+ longrepr = item.repr_failure(excinfo)
+ else: # exception in setup or teardown
+ longrepr = item._repr_failure_py(excinfo)
+ return TestReport(item.nodeid, item.location,
+ keywords, outcome, longrepr, when)
+
+class TestReport(BaseReport):
+ """ Basic test report object (also used for setup and teardown calls if
+ they fail).
+ """
+ def __init__(self, nodeid, location,
+ keywords, outcome, longrepr, when):
+ #: normalized collection node id
+ self.nodeid = nodeid
+
+ #: a (filesystempath, lineno, domaininfo) tuple indicating the
+ #: actual location of a test item - it might be different from the
+ #: collected one e.g. if a method is inherited from a different module.
+ self.location = location
+
+ #: a name -> value dictionary containing all keywords and
+ #: markers associated with a test invocation.
+ self.keywords = keywords
+
+ #: test outcome, always one of "passed", "failed", "skipped".
+ self.outcome = outcome
+
+ #: None or a failure representation.
+ self.longrepr = longrepr
+
+ #: one of 'setup', 'call', 'teardown' to indicate runtest phase.
+ self.when = when
+
+ def __repr__(self):
+ return "<TestReport %r when=%r outcome=%r>" % (
+ self.nodeid, self.when, self.outcome)
+
+class TeardownErrorReport(BaseReport):
+ outcome = "failed"
+ when = "teardown"
+ def __init__(self, longrepr):
+ self.longrepr = longrepr
+
+def pytest_make_collect_report(collector):
+ call = CallInfo(collector._memocollect, "memocollect")
+ longrepr = None
+ if not call.excinfo:
+ outcome = "passed"
+ else:
+ if call.excinfo.errisinstance(py.test.skip.Exception):
+ outcome = "skipped"
+ r = collector._repr_failure_py(call.excinfo, "line").reprcrash
+ longrepr = (str(r.path), r.lineno, r.message)
+ else:
+ outcome = "failed"
+ errorinfo = collector.repr_failure(call.excinfo)
+ if not hasattr(errorinfo, "toterminal"):
+ errorinfo = CollectErrorRepr(errorinfo)
+ longrepr = errorinfo
+ return CollectReport(collector.nodeid, outcome, longrepr,
+ getattr(call, 'result', None))
+
+class CollectReport(BaseReport):
+ def __init__(self, nodeid, outcome, longrepr, result):
+ self.nodeid = nodeid
+ self.outcome = outcome
+ self.longrepr = longrepr
+ self.result = result or []
+
+ @property
+ def location(self):
+ return (self.fspath, None, self.fspath)
+
+ def __repr__(self):
+ return "<CollectReport %r lenresult=%s outcome=%r>" % (
+ self.nodeid, len(self.result), self.outcome)
+
+class CollectErrorRepr(TerminalRepr):
+ def __init__(self, msg):
+ self.longrepr = msg
+ def toterminal(self, out):
+ out.line(str(self.longrepr), red=True)
+
+class SetupState(object):
+ """ shared state for setting up/tearing down test items or collectors. """
+ def __init__(self):
+ self.stack = []
+ self._finalizers = {}
+
+ def addfinalizer(self, finalizer, colitem):
+ """ attach a finalizer to the given colitem.
+ if colitem is None, this will add a finalizer that
+ is called at the end of teardown_all().
+ """
+ assert hasattr(finalizer, '__call__')
+ #assert colitem in self.stack
+ self._finalizers.setdefault(colitem, []).append(finalizer)
+
+ def _pop_and_teardown(self):
+ colitem = self.stack.pop()
+ self._teardown_with_finalization(colitem)
+
+ def _callfinalizers(self, colitem):
+ finalizers = self._finalizers.pop(colitem, None)
+ while finalizers:
+ fin = finalizers.pop()
+ fin()
+
+ def _teardown_with_finalization(self, colitem):
+ self._callfinalizers(colitem)
+ if colitem:
+ colitem.teardown()
+ for colitem in self._finalizers:
+ assert colitem is None or colitem in self.stack
+
+ def teardown_all(self):
+ while self.stack:
+ self._pop_and_teardown()
+ self._teardown_with_finalization(None)
+ assert not self._finalizers
+
+ def teardown_exact(self, item):
+ if self.stack and item == self.stack[-1]:
+ self._pop_and_teardown()
+ else:
+ self._callfinalizers(item)
+
+ def prepare(self, colitem):
+ """ setup objects along the collector chain to the test-method
+ and teardown previously setup objects."""
+ needed_collectors = colitem.listchain()
+ while self.stack:
+ if self.stack == needed_collectors[:len(self.stack)]:
+ break
+ self._pop_and_teardown()
+ # check if the last collection node has raised an error
+ for col in self.stack:
+ if hasattr(col, '_prepare_exc'):
+ py.builtin._reraise(*col._prepare_exc)
+ for col in needed_collectors[len(self.stack):]:
+ self.stack.append(col)
+ try:
+ col.setup()
+ except Exception:
+ col._prepare_exc = sys.exc_info()
+ raise
+
+# =============================================================
+# Test OutcomeExceptions and helpers for creating them.
+
+
+class OutcomeException(Exception):
+ """ OutcomeException and its subclass instances indicate and
+ contain info about test and collection outcomes.
+ """
+ def __init__(self, msg=None, pytrace=True):
+ self.msg = msg
+ self.pytrace = pytrace
+
+ def __repr__(self):
+ if self.msg:
+ return str(self.msg)
+ return "<%s instance>" %(self.__class__.__name__,)
+ __str__ = __repr__
+
+class Skipped(OutcomeException):
+ # XXX hackish: on 3k we fake to live in the builtins
+ # in order to have Skipped exception printing shorter/nicer
+ __module__ = 'builtins'
+
+class Failed(OutcomeException):
+ """ raised from an explicit call to py.test.fail() """
+ __module__ = 'builtins'
+
+class Exit(KeyboardInterrupt):
+ """ raised for immediate program exits (no tracebacks/summaries)"""
+ def __init__(self, msg="unknown reason"):
+ self.msg = msg
+ KeyboardInterrupt.__init__(self, msg)
+
+# exposed helper methods
+
+def exit(msg):
+ """ exit testing process as if KeyboardInterrupt was triggered. """
+ __tracebackhide__ = True
+ raise Exit(msg)
+
+exit.Exception = Exit
+
+def skip(msg=""):
+ """ skip an executing test with the given message. Note: it's usually
+ better to use the py.test.mark.skipif marker to declare a test to be
+ skipped under certain conditions like mismatching platforms or
+ dependencies. See the pytest_skipping plugin for details.
+ """
+ __tracebackhide__ = True
+ raise Skipped(msg=msg)
+skip.Exception = Skipped
+
+def fail(msg="", pytrace=True):
+ """ explicitely fail an currently-executing test with the given Message.
+ if @pytrace is not True the msg represents the full failure information.
+ """
+ __tracebackhide__ = True
+ raise Failed(msg=msg, pytrace=pytrace)
+fail.Exception = Failed
+
+
+def importorskip(modname, minversion=None):
+ """ return imported module if it has a higher __version__ than the
+ optionally specified 'minversion' - otherwise call py.test.skip()
+ with a message detailing the mismatch.
+ """
+ __tracebackhide__ = True
+ compile(modname, '', 'eval') # to catch syntaxerrors
+ try:
+ mod = __import__(modname, None, None, ['__doc__'])
+ except ImportError:
+ py.test.skip("could not import %r" %(modname,))
+ if minversion is None:
+ return mod
+ verattr = getattr(mod, '__version__', None)
+ if isinstance(minversion, str):
+ minver = minversion.split(".")
+ else:
+ minver = list(minversion)
+ if verattr is None or verattr.split(".") < minver:
+ py.test.skip("module %r has __version__ %r, required is: %r" %(
+ modname, verattr, minversion))
+ return mod
diff --git a/_pytest/skipping.py b/_pytest/skipping.py
new file mode 100644
index 0000000000..106c8518c3
--- /dev/null
+++ b/_pytest/skipping.py
@@ -0,0 +1,213 @@
+""" support for skip/xfail functions and markers. """
+
+import py, pytest
+
+def pytest_addoption(parser):
+ group = parser.getgroup("general")
+ group.addoption('--runxfail',
+ action="store_true", dest="runxfail", default=False,
+ help="run tests even if they are marked xfail")
+
+def pytest_namespace():
+ return dict(xfail=xfail)
+
+class XFailed(pytest.fail.Exception):
+ """ raised from an explicit call to py.test.xfail() """
+
+def xfail(reason=""):
+ """ xfail an executing test or setup functions with the given reason."""
+ __tracebackhide__ = True
+ raise XFailed(reason)
+xfail.Exception = XFailed
+
+class MarkEvaluator:
+ def __init__(self, item, name):
+ self.item = item
+ self.name = name
+
+ @property
+ def holder(self):
+ return self.item.keywords.get(self.name, None)
+ def __bool__(self):
+ return bool(self.holder)
+ __nonzero__ = __bool__
+
+ def istrue(self):
+ if self.holder:
+ d = {'os': py.std.os, 'sys': py.std.sys, 'config': self.item.config}
+ if self.holder.args:
+ self.result = False
+ for expr in self.holder.args:
+ self.expr = expr
+ if isinstance(expr, str):
+ result = cached_eval(self.item.config, expr, d)
+ else:
+ result = expr
+ if result:
+ self.result = True
+ self.expr = expr
+ break
+ else:
+ self.result = True
+ return getattr(self, 'result', False)
+
+ def get(self, attr, default=None):
+ return self.holder.kwargs.get(attr, default)
+
+ def getexplanation(self):
+ expl = self.get('reason', None)
+ if not expl:
+ if not hasattr(self, 'expr'):
+ return ""
+ else:
+ return "condition: " + self.expr
+ return expl
+
+
+def pytest_runtest_setup(item):
+ if not isinstance(item, pytest.Function):
+ return
+ evalskip = MarkEvaluator(item, 'skipif')
+ if evalskip.istrue():
+ py.test.skip(evalskip.getexplanation())
+ item._evalxfail = MarkEvaluator(item, 'xfail')
+ check_xfail_no_run(item)
+
+def pytest_pyfunc_call(pyfuncitem):
+ check_xfail_no_run(pyfuncitem)
+
+def check_xfail_no_run(item):
+ if not item.config.option.runxfail:
+ evalxfail = item._evalxfail
+ if evalxfail.istrue():
+ if not evalxfail.get('run', True):
+ py.test.xfail("[NOTRUN] " + evalxfail.getexplanation())
+
+def pytest_runtest_makereport(__multicall__, item, call):
+ if not isinstance(item, pytest.Function):
+ return
+ if not (call.excinfo and
+ call.excinfo.errisinstance(py.test.xfail.Exception)):
+ evalxfail = getattr(item, '_evalxfail', None)
+ if not evalxfail:
+ return
+ if call.excinfo and call.excinfo.errisinstance(py.test.xfail.Exception):
+ if not item.config.getvalue("runxfail"):
+ rep = __multicall__.execute()
+ rep.keywords['xfail'] = "reason: " + call.excinfo.value.msg
+ rep.outcome = "skipped"
+ return rep
+ rep = __multicall__.execute()
+ evalxfail = item._evalxfail
+ if not item.config.option.runxfail and evalxfail.istrue():
+ if call.excinfo:
+ rep.outcome = "skipped"
+ rep.keywords['xfail'] = evalxfail.getexplanation()
+ elif call.when == "call":
+ rep.outcome = "failed"
+ rep.keywords['xfail'] = evalxfail.getexplanation()
+ else:
+ if 'xfail' in rep.keywords:
+ del rep.keywords['xfail']
+ return rep
+
+# called by terminalreporter progress reporting
+def pytest_report_teststatus(report):
+ if 'xfail' in report.keywords:
+ if report.skipped:
+ return "xfailed", "x", "xfail"
+ elif report.failed:
+ return "xpassed", "X", "XPASS"
+
+# called by the terminalreporter instance/plugin
+def pytest_terminal_summary(terminalreporter):
+ tr = terminalreporter
+ if not tr.reportchars:
+ #for name in "xfailed skipped failed xpassed":
+ # if not tr.stats.get(name, 0):
+ # tr.write_line("HINT: use '-r' option to see extra "
+ # "summary info about tests")
+ # break
+ return
+
+ lines = []
+ for char in tr.reportchars:
+ if char == "x":
+ show_xfailed(terminalreporter, lines)
+ elif char == "X":
+ show_xpassed(terminalreporter, lines)
+ elif char in "fF":
+ show_failed(terminalreporter, lines)
+ elif char in "sS":
+ show_skipped(terminalreporter, lines)
+ if lines:
+ tr._tw.sep("=", "short test summary info")
+ for line in lines:
+ tr._tw.line(line)
+
+def show_failed(terminalreporter, lines):
+ tw = terminalreporter._tw
+ failed = terminalreporter.stats.get("failed")
+ if failed:
+ for rep in failed:
+ pos = rep.nodeid
+ lines.append("FAIL %s" %(pos, ))
+
+def show_xfailed(terminalreporter, lines):
+ xfailed = terminalreporter.stats.get("xfailed")
+ if xfailed:
+ for rep in xfailed:
+ pos = rep.nodeid
+ reason = rep.keywords['xfail']
+ lines.append("XFAIL %s" % (pos,))
+ if reason:
+ lines.append(" " + str(reason))
+
+def show_xpassed(terminalreporter, lines):
+ xpassed = terminalreporter.stats.get("xpassed")
+ if xpassed:
+ for rep in xpassed:
+ pos = rep.nodeid
+ reason = rep.keywords['xfail']
+ lines.append("XPASS %s %s" %(pos, reason))
+
+def cached_eval(config, expr, d):
+ if not hasattr(config, '_evalcache'):
+ config._evalcache = {}
+ try:
+ return config._evalcache[expr]
+ except KeyError:
+ #import sys
+ #print >>sys.stderr, ("cache-miss: %r" % expr)
+ config._evalcache[expr] = x = eval(expr, d)
+ return x
+
+
+def folded_skips(skipped):
+ d = {}
+ for event in skipped:
+ key = event.longrepr
+ assert len(key) == 3, (event, key)
+ d.setdefault(key, []).append(event)
+ l = []
+ for key, events in d.items():
+ l.append((len(events),) + key)
+ return l
+
+def show_skipped(terminalreporter, lines):
+ tr = terminalreporter
+ skipped = tr.stats.get('skipped', [])
+ if skipped:
+ #if not tr.hasopt('skipped'):
+ # tr.write_line(
+ # "%d skipped tests, specify -rs for more info" %
+ # len(skipped))
+ # return
+ fskips = folded_skips(skipped)
+ if fskips:
+ #tr.write_sep("_", "skipped test summary")
+ for num, fspath, lineno, reason in fskips:
+ if reason.startswith("Skipped: "):
+ reason = reason[9:]
+ lines.append("SKIP [%d] %s:%d: %s" %
+ (num, fspath, lineno, reason))
diff --git a/_pytest/standalonetemplate.py b/_pytest/standalonetemplate.py
new file mode 100755
index 0000000000..5aaceaa9ee
--- /dev/null
+++ b/_pytest/standalonetemplate.py
@@ -0,0 +1,63 @@
+#! /usr/bin/env python
+
+sources = """
+@SOURCES@"""
+
+import sys
+import base64
+import zlib
+import imp
+
+class DictImporter(object):
+ def __init__(self, sources):
+ self.sources = sources
+
+ def find_module(self, fullname, path=None):
+ if fullname in self.sources:
+ return self
+ if fullname + '.__init__' in self.sources:
+ return self
+ return None
+
+ def load_module(self, fullname):
+ # print "load_module:", fullname
+ from types import ModuleType
+ try:
+ s = self.sources[fullname]
+ is_pkg = False
+ except KeyError:
+ s = self.sources[fullname + '.__init__']
+ is_pkg = True
+
+ co = compile(s, fullname, 'exec')
+ module = sys.modules.setdefault(fullname, ModuleType(fullname))
+ module.__file__ = "%s/%s" % (__file__, fullname)
+ module.__loader__ = self
+ if is_pkg:
+ module.__path__ = [fullname]
+
+ do_exec(co, module.__dict__)
+ return sys.modules[fullname]
+
+ def get_source(self, name):
+ res = self.sources.get(name)
+ if res is None:
+ res = self.sources.get(name + '.__init__')
+ return res
+
+if __name__ == "__main__":
+ if sys.version_info >= (3, 0):
+ exec("def do_exec(co, loc): exec(co, loc)\n")
+ import pickle
+ sources = sources.encode("ascii") # ensure bytes
+ sources = pickle.loads(zlib.decompress(base64.decodebytes(sources)))
+ else:
+ import cPickle as pickle
+ exec("def do_exec(co, loc): exec co in loc\n")
+ sources = pickle.loads(zlib.decompress(base64.decodestring(sources)))
+
+ importer = DictImporter(sources)
+ sys.meta_path.append(importer)
+
+ entry = "@ENTRY@"
+ do_exec(entry, locals())
diff --git a/_pytest/terminal.py b/_pytest/terminal.py
new file mode 100644
index 0000000000..0b0ab1ee2f
--- /dev/null
+++ b/_pytest/terminal.py
@@ -0,0 +1,467 @@
+""" terminal reporting of the full testing process.
+
+This is a good source for looking at the various reporting hooks.
+"""
+import pytest, py
+import sys
+import os
+
+def pytest_addoption(parser):
+ group = parser.getgroup("terminal reporting", "reporting", after="general")
+ group._addoption('-v', '--verbose', action="count",
+ dest="verbose", default=0, help="increase verbosity."),
+ group._addoption('-q', '--quiet', action="count",
+ dest="quiet", default=0, help="decreate verbosity."),
+ group._addoption('-r',
+ action="store", dest="reportchars", default=None, metavar="chars",
+ help="show extra test summary info as specified by chars (f)ailed, "
+ "(s)skipped, (x)failed, (X)passed.")
+ group._addoption('-l', '--showlocals',
+ action="store_true", dest="showlocals", default=False,
+ help="show locals in tracebacks (disabled by default).")
+ group._addoption('--report',
+ action="store", dest="report", default=None, metavar="opts",
+ help="(deprecated, use -r)")
+ group._addoption('--tb', metavar="style",
+ action="store", dest="tbstyle", default='long',
+ type="choice", choices=['long', 'short', 'no', 'line', 'native'],
+ help="traceback print mode (long/short/line/no).")
+ group._addoption('--fulltrace',
+ action="store_true", dest="fulltrace", default=False,
+ help="don't cut any tracebacks (default is to cut).")
+
+def pytest_configure(config):
+ config.option.verbose -= config.option.quiet
+ if config.option.collectonly:
+ reporter = CollectonlyReporter(config)
+ else:
+ # we try hard to make printing resilient against
+ # later changes on FD level.
+ stdout = py.std.sys.stdout
+ if hasattr(os, 'dup') and hasattr(stdout, 'fileno'):
+ try:
+ newfd = os.dup(stdout.fileno())
+ #print "got newfd", newfd
+ except ValueError:
+ pass
+ else:
+ stdout = os.fdopen(newfd, stdout.mode, 1)
+ config._toclose = stdout
+ reporter = TerminalReporter(config, stdout)
+ config.pluginmanager.register(reporter, 'terminalreporter')
+ if config.option.debug or config.option.traceconfig:
+ def mywriter(tags, args):
+ msg = " ".join(map(str, args))
+ reporter.write_line("[traceconfig] " + msg)
+ config.trace.root.setprocessor("pytest:config", mywriter)
+
+def pytest_unconfigure(config):
+ if hasattr(config, '_toclose'):
+ #print "closing", config._toclose, config._toclose.fileno()
+ config._toclose.close()
+
+def getreportopt(config):
+ reportopts = ""
+ optvalue = config.option.report
+ if optvalue:
+ py.builtin.print_("DEPRECATED: use -r instead of --report option.",
+ file=py.std.sys.stderr)
+ if optvalue:
+ for setting in optvalue.split(","):
+ setting = setting.strip()
+ if setting == "skipped":
+ reportopts += "s"
+ elif setting == "xfailed":
+ reportopts += "x"
+ reportchars = config.option.reportchars
+ if reportchars:
+ for char in reportchars:
+ if char not in reportopts:
+ reportopts += char
+ return reportopts
+
+def pytest_report_teststatus(report):
+ if report.passed:
+ letter = "."
+ elif report.skipped:
+ letter = "s"
+ elif report.failed:
+ letter = "F"
+ if report.when != "call":
+ letter = "f"
+ return report.outcome, letter, report.outcome.upper()
+
+class TerminalReporter:
+ def __init__(self, config, file=None):
+ self.config = config
+ self.verbosity = self.config.option.verbose
+ self.showheader = self.verbosity >= 0
+ self.showfspath = self.verbosity >= 0
+ self.showlongtestinfo = self.verbosity > 0
+ self._numcollected = 0
+
+ self.stats = {}
+ self.curdir = py.path.local()
+ if file is None:
+ file = py.std.sys.stdout
+ self._tw = py.io.TerminalWriter(file)
+ self.currentfspath = None
+ self.reportchars = getreportopt(config)
+ self.hasmarkup = self._tw.hasmarkup
+
+ def hasopt(self, char):
+ char = {'xfailed': 'x', 'skipped': 's'}.get(char,char)
+ return char in self.reportchars
+
+ def write_fspath_result(self, fspath, res):
+ if fspath != self.currentfspath:
+ self.currentfspath = fspath
+ #fspath = self.curdir.bestrelpath(fspath)
+ self._tw.line()
+ #relpath = self.curdir.bestrelpath(fspath)
+ self._tw.write(fspath + " ")
+ self._tw.write(res)
+
+ def write_ensure_prefix(self, prefix, extra="", **kwargs):
+ if self.currentfspath != prefix:
+ self._tw.line()
+ self.currentfspath = prefix
+ self._tw.write(prefix)
+ if extra:
+ self._tw.write(extra, **kwargs)
+ self.currentfspath = -2
+
+ def ensure_newline(self):
+ if self.currentfspath:
+ self._tw.line()
+ self.currentfspath = None
+
+ def write(self, content, **markup):
+ self._tw.write(content, **markup)
+
+ def write_line(self, line, **markup):
+ line = str(line)
+ self.ensure_newline()
+ self._tw.line(line, **markup)
+
+ def rewrite(self, line, **markup):
+ line = str(line)
+ self._tw.write("\r" + line, **markup)
+
+ def write_sep(self, sep, title=None, **markup):
+ self.ensure_newline()
+ self._tw.sep(sep, title, **markup)
+
+ def pytest_internalerror(self, excrepr):
+ for line in str(excrepr).split("\n"):
+ self.write_line("INTERNALERROR> " + line)
+ return 1
+
+ def pytest_plugin_registered(self, plugin):
+ if self.config.option.traceconfig:
+ msg = "PLUGIN registered: %s" %(plugin,)
+ # XXX this event may happen during setup/teardown time
+ # which unfortunately captures our output here
+ # which garbles our output if we use self.write_line
+ self.write_line(msg)
+
+ def pytest_deselected(self, items):
+ self.stats.setdefault('deselected', []).extend(items)
+
+ def pytest__teardown_final_logerror(self, report):
+ self.stats.setdefault("error", []).append(report)
+
+ def pytest_runtest_logstart(self, nodeid, location):
+ # ensure that the path is printed before the
+ # 1st test of a module starts running
+ fspath = nodeid.split("::")[0]
+ if self.showlongtestinfo:
+ line = self._locationline(fspath, *location)
+ self.write_ensure_prefix(line, "")
+ elif self.showfspath:
+ self.write_fspath_result(fspath, "")
+
+ def pytest_runtest_logreport(self, report):
+ rep = report
+ res = self.config.hook.pytest_report_teststatus(report=rep)
+ cat, letter, word = res
+ self.stats.setdefault(cat, []).append(rep)
+ if not letter and not word:
+ # probably passed setup/teardown
+ return
+ if self.verbosity <= 0:
+ if not hasattr(rep, 'node') and self.showfspath:
+ self.write_fspath_result(rep.fspath, letter)
+ else:
+ self._tw.write(letter)
+ else:
+ if isinstance(word, tuple):
+ word, markup = word
+ else:
+ if rep.passed:
+ markup = {'green':True}
+ elif rep.failed:
+ markup = {'red':True}
+ elif rep.skipped:
+ markup = {'yellow':True}
+ line = self._locationline(str(rep.fspath), *rep.location)
+ if not hasattr(rep, 'node'):
+ self.write_ensure_prefix(line, word, **markup)
+ #self._tw.write(word, **markup)
+ else:
+ self.ensure_newline()
+ if hasattr(rep, 'node'):
+ self._tw.write("[%s] " % rep.node.gateway.id)
+ self._tw.write(word, **markup)
+ self._tw.write(" " + line)
+ self.currentfspath = -2
+
+ def pytest_collection(self):
+ if not self.hasmarkup:
+ self.write("collecting ... ", bold=True)
+
+ def pytest_collectreport(self, report):
+ if report.failed:
+ self.stats.setdefault("error", []).append(report)
+ elif report.skipped:
+ self.stats.setdefault("skipped", []).append(report)
+ items = [x for x in report.result if isinstance(x, pytest.Item)]
+ self._numcollected += len(items)
+ if self.hasmarkup:
+ #self.write_fspath_result(report.fspath, 'E')
+ self.report_collect()
+
+ def report_collect(self, final=False):
+ errors = len(self.stats.get('error', []))
+ skipped = len(self.stats.get('skipped', []))
+ if final:
+ line = "collected "
+ else:
+ line = "collecting "
+ line += str(self._numcollected) + " items"
+ if errors:
+ line += " / %d errors" % errors
+ if skipped:
+ line += " / %d skipped" % skipped
+ if self.hasmarkup:
+ if final:
+ line += " \n"
+ self.rewrite(line, bold=True)
+ else:
+ self.write_line(line)
+
+ def pytest_collection_modifyitems(self):
+ self.report_collect(True)
+
+ def pytest_sessionstart(self, session):
+ self._sessionstarttime = py.std.time.time()
+ if not self.showheader:
+ return
+ self.write_sep("=", "test session starts", bold=True)
+ verinfo = ".".join(map(str, sys.version_info[:3]))
+ msg = "platform %s -- Python %s" % (sys.platform, verinfo)
+ if hasattr(sys, 'pypy_version_info'):
+ verinfo = ".".join(map(str, sys.pypy_version_info[:3]))
+ msg += "[pypy-%s]" % verinfo
+ msg += " -- pytest-%s" % (py.test.__version__)
+ if self.verbosity > 0 or self.config.option.debug or \
+ getattr(self.config.option, 'pastebin', None):
+ msg += " -- " + str(sys.executable)
+ self.write_line(msg)
+ lines = self.config.hook.pytest_report_header(config=self.config)
+ lines.reverse()
+ for line in flatten(lines):
+ self.write_line(line)
+
+ def pytest_collection_finish(self):
+ if not self.showheader:
+ return
+ #for i, testarg in enumerate(self.config.args):
+ # self.write_line("test path %d: %s" %(i+1, testarg))
+
+ def pytest_sessionfinish(self, exitstatus, __multicall__):
+ __multicall__.execute()
+ self._tw.line("")
+ if exitstatus in (0, 1, 2):
+ self.summary_errors()
+ self.summary_failures()
+ self.config.hook.pytest_terminal_summary(terminalreporter=self)
+ if exitstatus == 2:
+ self._report_keyboardinterrupt()
+ self.summary_deselected()
+ self.summary_stats()
+
+ def pytest_keyboard_interrupt(self, excinfo):
+ self._keyboardinterrupt_memo = excinfo.getrepr(funcargs=True)
+
+ def _report_keyboardinterrupt(self):
+ excrepr = self._keyboardinterrupt_memo
+ msg = excrepr.reprcrash.message
+ self.write_sep("!", msg)
+ if "KeyboardInterrupt" in msg:
+ if self.config.option.fulltrace:
+ excrepr.toterminal(self._tw)
+ else:
+ excrepr.reprcrash.toterminal(self._tw)
+
+ def _locationline(self, collect_fspath, fspath, lineno, domain):
+ if fspath and fspath != collect_fspath:
+ fspath = "%s <- %s" % (collect_fspath, fspath)
+ if lineno is not None:
+ lineno += 1
+ if fspath and lineno and domain:
+ line = "%(fspath)s:%(lineno)s: %(domain)s"
+ elif fspath and domain:
+ line = "%(fspath)s: %(domain)s"
+ elif fspath and lineno:
+ line = "%(fspath)s:%(lineno)s %(extrapath)s"
+ else:
+ line = "[nolocation]"
+ return line % locals() + " "
+
+ def _getfailureheadline(self, rep):
+ if hasattr(rep, 'location'):
+ fspath, lineno, domain = rep.location
+ return domain
+ else:
+ return "test session" # XXX?
+
+ def _getcrashline(self, rep):
+ try:
+ return str(rep.longrepr.reprcrash)
+ except AttributeError:
+ try:
+ return str(rep.longrepr)[:50]
+ except AttributeError:
+ return ""
+
+ #
+ # summaries for sessionfinish
+ #
+ def getreports(self, name):
+ l = []
+ for x in self.stats.get(name, []):
+ if not hasattr(x, '_pdbshown'):
+ l.append(x)
+ return l
+
+ def summary_failures(self):
+ if self.config.option.tbstyle != "no":
+ reports = self.getreports('failed')
+ if not reports:
+ return
+ self.write_sep("=", "FAILURES")
+ for rep in reports:
+ if self.config.option.tbstyle == "line":
+ line = self._getcrashline(rep)
+ self.write_line(line)
+ else:
+ msg = self._getfailureheadline(rep)
+ self.write_sep("_", msg)
+ rep.toterminal(self._tw)
+
+ def summary_errors(self):
+ if self.config.option.tbstyle != "no":
+ reports = self.getreports('error')
+ if not reports:
+ return
+ self.write_sep("=", "ERRORS")
+ for rep in self.stats['error']:
+ msg = self._getfailureheadline(rep)
+ if not hasattr(rep, 'when'):
+ # collect
+ msg = "ERROR collecting " + msg
+ elif rep.when == "setup":
+ msg = "ERROR at setup of " + msg
+ elif rep.when == "teardown":
+ msg = "ERROR at teardown of " + msg
+ self.write_sep("_", msg)
+ rep.toterminal(self._tw)
+
+ def summary_stats(self):
+ session_duration = py.std.time.time() - self._sessionstarttime
+
+ keys = "failed passed skipped deselected".split()
+ for key in self.stats.keys():
+ if key not in keys:
+ keys.append(key)
+ parts = []
+ for key in keys:
+ val = self.stats.get(key, None)
+ if val:
+ parts.append("%d %s" %(len(val), key))
+ line = ", ".join(parts)
+ # XXX coloring
+ msg = "%s in %.2f seconds" %(line, session_duration)
+ if self.verbosity >= 0:
+ self.write_sep("=", msg, bold=True)
+ else:
+ self.write_line(msg, bold=True)
+
+ def summary_deselected(self):
+ if 'deselected' in self.stats:
+ self.write_sep("=", "%d tests deselected by %r" %(
+ len(self.stats['deselected']), self.config.option.keyword), bold=True)
+
+
+class CollectonlyReporter:
+ INDENT = " "
+
+ def __init__(self, config, out=None):
+ self.config = config
+ if out is None:
+ out = py.std.sys.stdout
+ self._tw = py.io.TerminalWriter(out)
+ self.indent = ""
+ self._failed = []
+
+ def outindent(self, line):
+ self._tw.line(self.indent + str(line))
+
+ def pytest_internalerror(self, excrepr):
+ for line in str(excrepr).split("\n"):
+ self._tw.line("INTERNALERROR> " + line)
+
+ def pytest_collectstart(self, collector):
+ if collector.session != collector:
+ self.outindent(collector)
+ self.indent += self.INDENT
+
+ def pytest_itemcollected(self, item):
+ self.outindent(item)
+
+ def pytest_collectreport(self, report):
+ if not report.passed:
+ if hasattr(report.longrepr, 'reprcrash'):
+ msg = report.longrepr.reprcrash.message
+ else:
+ # XXX unify (we have CollectErrorRepr here)
+ msg = str(report.longrepr[2])
+ self.outindent("!!! %s !!!" % msg)
+ #self.outindent("!!! error !!!")
+ self._failed.append(report)
+ self.indent = self.indent[:-len(self.INDENT)]
+
+ def pytest_collection_finish(self):
+ if self._failed:
+ self._tw.sep("!", "collection failures")
+ for rep in self._failed:
+ rep.toterminal(self._tw)
+ return self._failed and 1 or 0
+
+def repr_pythonversion(v=None):
+ if v is None:
+ v = sys.version_info
+ try:
+ return "%s.%s.%s-%s-%s" % v
+ except (TypeError, ValueError):
+ return str(v)
+
+def flatten(l):
+ for x in l:
+ if isinstance(x, (list, tuple)):
+ for y in flatten(x):
+ yield y
+ else:
+ yield x
+
diff --git a/_pytest/tmpdir.py b/_pytest/tmpdir.py
new file mode 100644
index 0000000000..9eab52d545
--- /dev/null
+++ b/_pytest/tmpdir.py
@@ -0,0 +1,71 @@
+""" support for providing temporary directories to test functions. """
+import pytest, py
+from _pytest.monkeypatch import monkeypatch
+
+class TempdirHandler:
+ def __init__(self, config):
+ self.config = config
+ self.trace = config.trace.get("tmpdir")
+
+ def ensuretemp(self, string, dir=1):
+ """ (deprecated) return temporary directory path with
+ the given string as the trailing part. It is usually
+ better to use the 'tmpdir' function argument which
+ provides an empty unique-per-test-invocation directory
+ and is guaranteed to be empty.
+ """
+ #py.log._apiwarn(">1.1", "use tmpdir function argument")
+ return self.getbasetemp().ensure(string, dir=dir)
+
+ def mktemp(self, basename, numbered=True):
+ basetemp = self.getbasetemp()
+ if not numbered:
+ p = basetemp.mkdir(basename)
+ else:
+ p = py.path.local.make_numbered_dir(prefix=basename,
+ keep=0, rootdir=basetemp, lock_timeout=None)
+ self.trace("mktemp", p)
+ return p
+
+ def getbasetemp(self):
+ """ return base temporary directory. """
+ try:
+ return self._basetemp
+ except AttributeError:
+ basetemp = self.config.option.basetemp
+ if basetemp:
+ basetemp = py.path.local(basetemp)
+ if basetemp.check():
+ basetemp.remove()
+ basetemp.mkdir()
+ else:
+ basetemp = py.path.local.make_numbered_dir(prefix='pytest-')
+ self._basetemp = t = basetemp
+ self.trace("new basetemp", t)
+ return t
+
+ def finish(self):
+ self.trace("finish")
+
+def pytest_configure(config):
+ config._mp = mp = monkeypatch()
+ t = TempdirHandler(config)
+ mp.setattr(config, '_tmpdirhandler', t, raising=False)
+ mp.setattr(pytest, 'ensuretemp', t.ensuretemp, raising=False)
+
+def pytest_unconfigure(config):
+ config._tmpdirhandler.finish()
+ config._mp.undo()
+
+def pytest_funcarg__tmpdir(request):
+ """return a temporary directory path object
+ unique to each test function invocation,
+ created as a sub directory of the base temporary
+ directory. The returned object is a `py.path.local`_
+ path object.
+ """
+ name = request._pyfuncitem.name
+ name = py.std.re.sub("[\W]", "_", name)
+ x = request.config._tmpdirhandler.mktemp(name, numbered=True)
+ return x.realpath()
+
diff --git a/_pytest/unittest.py b/_pytest/unittest.py
new file mode 100644
index 0000000000..5131a93e35
--- /dev/null
+++ b/_pytest/unittest.py
@@ -0,0 +1,139 @@
+""" discovery and running of std-library "unittest" style tests. """
+import pytest, py
+import sys, pdb
+
+def pytest_pycollect_makeitem(collector, name, obj):
+ unittest = sys.modules.get('unittest')
+ if unittest is None:
+ return # nobody can have derived unittest.TestCase
+ try:
+ isunit = issubclass(obj, unittest.TestCase)
+ except KeyboardInterrupt:
+ raise
+ except Exception:
+ pass
+ else:
+ if isunit:
+ return UnitTestCase(name, parent=collector)
+
+class UnitTestCase(pytest.Class):
+ def collect(self):
+ loader = py.std.unittest.TestLoader()
+ for name in loader.getTestCaseNames(self.obj):
+ yield TestCaseFunction(name, parent=self)
+
+ def setup(self):
+ meth = getattr(self.obj, 'setUpClass', None)
+ if meth is not None:
+ meth()
+ super(UnitTestCase, self).setup()
+
+ def teardown(self):
+ meth = getattr(self.obj, 'tearDownClass', None)
+ if meth is not None:
+ meth()
+ super(UnitTestCase, self).teardown()
+
+class TestCaseFunction(pytest.Function):
+ _excinfo = None
+
+ def __init__(self, name, parent):
+ super(TestCaseFunction, self).__init__(name, parent)
+ if hasattr(self._obj, 'todo'):
+ getattr(self._obj, 'im_func', self._obj).xfail = \
+ pytest.mark.xfail(reason=str(self._obj.todo))
+
+ def setup(self):
+ self._testcase = self.parent.obj(self.name)
+ self._obj = getattr(self._testcase, self.name)
+ if hasattr(self._testcase, 'setup_method'):
+ self._testcase.setup_method(self._obj)
+
+ def teardown(self):
+ if hasattr(self._testcase, 'teardown_method'):
+ self._testcase.teardown_method(self._obj)
+
+ def startTest(self, testcase):
+ pass
+
+ def _addexcinfo(self, rawexcinfo):
+ # unwrap potential exception info (see twisted trial support below)
+ rawexcinfo = getattr(rawexcinfo, '_rawexcinfo', rawexcinfo)
+ try:
+ excinfo = py.code.ExceptionInfo(rawexcinfo)
+ except TypeError:
+ try:
+ try:
+ l = py.std.traceback.format_exception(*rawexcinfo)
+ l.insert(0, "NOTE: Incompatible Exception Representation, "
+ "displaying natively:\n\n")
+ pytest.fail("".join(l), pytrace=False)
+ except (pytest.fail.Exception, KeyboardInterrupt):
+ raise
+ except:
+ pytest.fail("ERROR: Unknown Incompatible Exception "
+ "representation:\n%r" %(rawexcinfo,), pytrace=False)
+ except KeyboardInterrupt:
+ raise
+ except pytest.fail.Exception:
+ excinfo = py.code.ExceptionInfo()
+ self.__dict__.setdefault('_excinfo', []).append(excinfo)
+
+ def addError(self, testcase, rawexcinfo):
+ self._addexcinfo(rawexcinfo)
+ def addFailure(self, testcase, rawexcinfo):
+ self._addexcinfo(rawexcinfo)
+ def addSkip(self, testcase, reason):
+ try:
+ pytest.skip(reason)
+ except pytest.skip.Exception:
+ self._addexcinfo(sys.exc_info())
+ def addExpectedFailure(self, testcase, rawexcinfo, reason):
+ try:
+ pytest.xfail(str(reason))
+ except pytest.xfail.Exception:
+ self._addexcinfo(sys.exc_info())
+ def addUnexpectedSuccess(self, testcase, reason):
+ pass
+ def addSuccess(self, testcase):
+ pass
+ def stopTest(self, testcase):
+ pass
+ def runtest(self):
+ self._testcase(result=self)
+
+@pytest.mark.tryfirst
+def pytest_runtest_makereport(item, call):
+ if isinstance(item, TestCaseFunction):
+ if item._excinfo:
+ call.excinfo = item._excinfo.pop(0)
+ del call.result
+
+# twisted trial support
+def pytest_runtest_protocol(item, __multicall__):
+ if isinstance(item, TestCaseFunction):
+ if 'twisted.trial.unittest' in sys.modules:
+ ut = sys.modules['twisted.python.failure']
+ Failure__init__ = ut.Failure.__init__.im_func
+ check_testcase_implements_trial_reporter()
+ def excstore(self, exc_value=None, exc_type=None, exc_tb=None):
+ if exc_value is None:
+ self._rawexcinfo = sys.exc_info()
+ else:
+ if exc_type is None:
+ exc_type = type(exc_value)
+ self._rawexcinfo = (exc_type, exc_value, exc_tb)
+ Failure__init__(self, exc_value, exc_type, exc_tb)
+ ut.Failure.__init__ = excstore
+ try:
+ return __multicall__.execute()
+ finally:
+ ut.Failure.__init__ = Failure__init__
+
+def check_testcase_implements_trial_reporter(done=[]):
+ if done:
+ return
+ from zope.interface import classImplements
+ from twisted.trial.itrial import IReporter
+ classImplements(TestCaseFunction, IReporter)
+ done.append(1)