aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/snakeoil/contexts.py')
-rw-r--r--src/snakeoil/contexts.py78
1 files changed, 53 insertions, 25 deletions
diff --git a/src/snakeoil/contexts.py b/src/snakeoil/contexts.py
index 57092b4d..394574b5 100644
--- a/src/snakeoil/contexts.py
+++ b/src/snakeoil/contexts.py
@@ -41,6 +41,7 @@ from .sequences import predicate_split
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
+
class SplitExec:
"""Context manager separating code execution across parent/child processes.
@@ -48,6 +49,7 @@ class SplitExec:
of the context are executed only on the forked child. Exceptions are
pickled and passed back to the parent.
"""
+
def __init__(self):
self.__trace_lock = threading.Lock()
self.__orig_sys_trace = None
@@ -184,7 +186,7 @@ class SplitExec:
@staticmethod
def __excepthook(_exc_type, exc_value, exc_traceback):
"""Output the proper traceback information from the chroot context."""
- if hasattr(exc_value, '__traceback_list__'):
+ if hasattr(exc_value, "__traceback_list__"):
sys.stderr.write(exc_value.__traceback_list__)
else:
traceback.print_tb(exc_traceback)
@@ -253,7 +255,7 @@ class SplitExec:
except AttributeError:
# an offset of two accounts for this method and its caller
frame = inspect.stack(0)[2][0]
- while frame.f_locals.get('self') is self:
+ while frame.f_locals.get("self") is self:
frame = frame.f_back
self.__frame = frame # pylint: disable=W0201
return frame
@@ -262,11 +264,24 @@ class SplitExec:
class Namespace(SplitExec):
"""Context manager that provides Linux namespace support."""
- def __init__(self, mount=False, uts=True, ipc=False, net=False, pid=False,
- user=False, hostname=None):
+ def __init__(
+ self,
+ mount=False,
+ uts=True,
+ ipc=False,
+ net=False,
+ pid=False,
+ user=False,
+ hostname=None,
+ ):
self._hostname = hostname
self._namespaces = {
- 'mount': mount, 'uts': uts, 'ipc': ipc, 'net': net, 'pid': pid, 'user': user,
+ "mount": mount,
+ "uts": uts,
+ "ipc": ipc,
+ "net": net,
+ "pid": pid,
+ "user": user,
}
super().__init__()
@@ -279,8 +294,8 @@ class GitStash(AbstractContextManager):
def __init__(self, path, pathspecs=None, staged=False):
self.path = path
- self.pathspecs = ['--'] + pathspecs if pathspecs else []
- self._staged = ['--keep-index'] if staged else []
+ self.pathspecs = ["--"] + pathspecs if pathspecs else []
+ self._staged = ["--keep-index"] if staged else []
self._stashed = False
def __enter__(self):
@@ -288,14 +303,18 @@ class GitStash(AbstractContextManager):
# check for untracked or modified/uncommitted files
try:
p = subprocess.run(
- ['git', 'status', '--porcelain=1', '-u'] + self.pathspecs,
- stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
- cwd=self.path, encoding='utf8', check=True)
+ ["git", "status", "--porcelain=1", "-u"] + self.pathspecs,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ cwd=self.path,
+ encoding="utf8",
+ check=True,
+ )
except subprocess.CalledProcessError:
- raise ValueError(f'not a git repo: {self.path}')
+ raise ValueError(f"not a git repo: {self.path}")
# split file changes into unstaged vs staged
- unstaged, staged = predicate_split(lambda x: x[1] == ' ', p.stdout.splitlines())
+ unstaged, staged = predicate_split(lambda x: x[1] == " ", p.stdout.splitlines())
# don't stash when no relevant changes exist
if self._staged:
@@ -306,14 +325,18 @@ class GitStash(AbstractContextManager):
# stash all existing untracked or modified/uncommitted files
try:
- stash_cmd = ['git', 'stash', 'push', '-u', '-m', 'pkgcheck scan --commits']
+ stash_cmd = ["git", "stash", "push", "-u", "-m", "pkgcheck scan --commits"]
subprocess.run(
stash_cmd + self._staged + self.pathspecs,
- stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
- cwd=self.path, check=True, encoding='utf8')
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.PIPE,
+ cwd=self.path,
+ check=True,
+ encoding="utf8",
+ )
except subprocess.CalledProcessError as e:
error = e.stderr.splitlines()[0]
- raise UserException(f'git failed stashing files: {error}')
+ raise UserException(f"git failed stashing files: {error}")
self._stashed = True
def __exit__(self, _exc_type, _exc_value, _traceback):
@@ -321,12 +344,16 @@ class GitStash(AbstractContextManager):
if self._stashed:
try:
subprocess.run(
- ['git', 'stash', 'pop'],
- stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
- cwd=self.path, check=True, encoding='utf8')
+ ["git", "stash", "pop"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.PIPE,
+ cwd=self.path,
+ check=True,
+ encoding="utf8",
+ )
except subprocess.CalledProcessError as e:
error = e.stderr.splitlines()[0]
- raise UserException(f'git failed applying stash: {error}')
+ raise UserException(f"git failed applying stash: {error}")
@contextmanager
@@ -347,7 +374,7 @@ def chdir(path):
@contextmanager
-def syspath(path: str, condition: bool=True, position: int=0):
+def syspath(path: str, condition: bool = True, position: int = 0):
"""Context manager that mangles ``sys.path`` and then reverts on exit.
:param path: The directory path to add to ``sys.path``.
@@ -425,6 +452,7 @@ def os_environ(*remove, **update):
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
@contextmanager
def patch(target, new):
"""Simplified module monkey patching via context manager.
@@ -434,7 +462,7 @@ def patch(target, new):
"""
def _import_module(target):
- components = target.split('.')
+ components = target.split(".")
import_path = components.pop(0)
module = import_module(import_path)
for comp in components:
@@ -448,16 +476,16 @@ def patch(target, new):
def _get_target(target):
if isinstance(target, str):
try:
- module, attr = target.rsplit('.', 1)
+ module, attr = target.rsplit(".", 1)
except (TypeError, ValueError):
- raise TypeError(f'invalid target: {target!r}')
+ raise TypeError(f"invalid target: {target!r}")
module = _import_module(module)
return module, attr
else:
try:
obj, attr = target
except (TypeError, ValueError):
- raise TypeError(f'invalid target: {target!r}')
+ raise TypeError(f"invalid target: {target!r}")
return obj, attr
obj, attr = _get_target(target)