diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/_emerge/PollScheduler.py | 9 | ||||
-rw-r--r-- | lib/_emerge/depgraph.py | 15 | ||||
-rw-r--r-- | lib/portage/dbapi/__init__.py | 11 | ||||
-rw-r--r-- | lib/portage/dbapi/porttree.py | 2 | ||||
-rw-r--r-- | lib/portage/process.py | 254 | ||||
-rw-r--r-- | lib/portage/tests/ebuild/test_doebuild_spawn.py | 3 | ||||
-rw-r--r-- | lib/portage/tests/ebuild/test_ipc_daemon.py | 3 | ||||
-rw-r--r-- | lib/portage/tests/emerge/conftest.py | 4 | ||||
-rw-r--r-- | lib/portage/tests/process/test_spawn_returnproc.py | 2 | ||||
-rw-r--r-- | lib/portage/tests/resolver/test_depth.py | 8 | ||||
-rw-r--r-- | lib/portage/tests/resolver/test_emptytree_reinstall_unsatisfiability.py | 137 | ||||
-rw-r--r-- | lib/portage/tests/resolver/test_useflags.py | 6 | ||||
-rw-r--r-- | lib/portage/tests/util/test_socks5.py | 94 | ||||
-rw-r--r-- | lib/portage/util/_async/SchedulerInterface.py | 9 | ||||
-rw-r--r-- | lib/portage/util/_eventloop/asyncio_event_loop.py | 44 | ||||
-rw-r--r-- | lib/portage/util/futures/_asyncio/__init__.py | 28 | ||||
-rw-r--r-- | lib/portage/util/socks5.py | 42 |
17 files changed, 511 insertions, 160 deletions
diff --git a/lib/_emerge/PollScheduler.py b/lib/_emerge/PollScheduler.py index e5dffd8af..b5bfd20b7 100644 --- a/lib/_emerge/PollScheduler.py +++ b/lib/_emerge/PollScheduler.py @@ -1,4 +1,4 @@ -# Copyright 1999-2023 Gentoo Authors +# Copyright 1999-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import threading @@ -39,6 +39,13 @@ class PollScheduler: self._event_loop, is_background=self._is_background ) + @property + def _loop(self): + """ + Returns the real underlying asyncio loop. + """ + return self._event_loop._loop + def _is_background(self): return self._background diff --git a/lib/_emerge/depgraph.py b/lib/_emerge/depgraph.py index a53eaaba9..63e7ad1ee 100644 --- a/lib/_emerge/depgraph.py +++ b/lib/_emerge/depgraph.py @@ -783,7 +783,7 @@ class depgraph: ebuild_hash=ebuild_hash, portdb=portdb, repo_path=repo_path, - settings=portdb.doebuild_settings, + settings=settings, deallocate_config=deallocate_config, ) proc.addExitListener(self._dynamic_deps_proc_exit(pkg, fake_vartree)) @@ -7640,6 +7640,19 @@ class depgraph: continue if ( + empty + and pkg.installed + and not self._frozen_config.excluded_pkgs.findAtomForPackage( + pkg, modified_use=self._pkg_use_enabled(pkg) + ) + ): + # With --emptytree option we assume no packages + # are installed, so we do not select them. + # But we allow installed packages to satisfy dependency requirements + # if they're explicitly excluded, so we allow them to be selected. + continue + + if ( not pkg.installed and self._frozen_config.excluded_pkgs.findAtomForPackage( pkg, modified_use=self._pkg_use_enabled(pkg) diff --git a/lib/portage/dbapi/__init__.py b/lib/portage/dbapi/__init__.py index 6f95b93a2..9105227c7 100644 --- a/lib/portage/dbapi/__init__.py +++ b/lib/portage/dbapi/__init__.py @@ -1,11 +1,12 @@ -# Copyright 1998-2023 Gentoo Authors +# Copyright 1998-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 __all__ = ["dbapi"] import functools +import logging import re -import warnings +import sys from typing import Any, Dict, List, Optional, Tuple from collections.abc import Sequence @@ -429,7 +430,9 @@ class dbapi: try: aux_update(cpv, metadata_updates) except (InvalidBinaryPackageFormat, CorruptionKeyError) as e: - warnings.warn(e) + logging.warning( + f"{e.__class__.__name__}: {e}", exc_info=sys.exc_info() + ) if onUpdate: onUpdate(maxval, i + 1) if onProgress: @@ -477,6 +480,6 @@ class dbapi: try: self.aux_update(mycpv, mydata) except CorruptionKeyError as e: - warnings.warn(e) + logging.warning(f"{e.__class__.__name__}: {e}", exc_info=sys.exc_info()) continue return moves diff --git a/lib/portage/dbapi/porttree.py b/lib/portage/dbapi/porttree.py index 4eebe1183..de6aa5c82 100644 --- a/lib/portage/dbapi/porttree.py +++ b/lib/portage/dbapi/porttree.py @@ -775,7 +775,7 @@ class portdbapi(dbapi): try: if ( threading.current_thread() is threading.main_thread() - and loop is asyncio._safe_loop() + and loop._loop is asyncio._safe_loop()._loop ): # In this case use self._doebuild_settings_lock to manage concurrency. deallocate_config = loop.create_future() diff --git a/lib/portage/process.py b/lib/portage/process.py index ead11e318..2365778e6 100644 --- a/lib/portage/process.py +++ b/lib/portage/process.py @@ -38,6 +38,7 @@ from portage.const import BASH_BINARY, SANDBOX_BINARY, FAKEROOT_BINARY # PREFIX LOCAL from portage.const import MACOSSANDBOX_BINARY from portage.exception import CommandNotFound +from portage.proxy.objectproxy import ObjectProxy from portage.util._ctypes import find_library, LoadLibrary, ctypes try: @@ -976,114 +977,119 @@ def _exec( signal.signal(signal.SIGQUIT, signal.SIG_DFL) # Unshare (while still uid==0) + have_unshare = False + libc = None if unshare_net or unshare_ipc or unshare_mount or unshare_pid: filename = find_library("c") if filename is not None: libc = LoadLibrary(filename) if libc is not None: - # unshare() may not be supported by libc - if not hasattr(libc, "unshare"): - unshare_net = False - unshare_ipc = False - unshare_mount = False - unshare_pid = False - else: - # Since a failed unshare call could corrupt process - # state, first validate that the call can succeed. - # The parent process should call _unshare_validate - # before it forks, so that all child processes can - # reuse _unshare_validate results that have been - # cached by the parent process. - errno_value = _unshare_validate(unshare_flags) - if errno_value == 0 and libc.unshare(unshare_flags) != 0: - errno_value = ctypes.get_errno() - if errno_value != 0: - involved_features = [] - if unshare_ipc: - involved_features.append("ipc-sandbox") - if unshare_mount: - involved_features.append("mount-sandbox") - if unshare_net: - involved_features.append("network-sandbox") - if unshare_pid: - involved_features.append("pid-sandbox") - - writemsg( - 'Unable to unshare: %s (for FEATURES="%s")\n' - % ( - errno.errorcode.get(errno_value, "?"), - " ".join(involved_features), - ), - noiselevel=-1, - ) - else: - if unshare_pid: - # pid namespace requires us to become init - binary, myargs = ( - portage._python_interpreter, - [ - portage._python_interpreter, - os.path.join(portage._bin_path, "pid-ns-init"), - _unicode_encode("" if uid is None else str(uid)), - _unicode_encode("" if gid is None else str(gid)), - _unicode_encode( - "" - if groups is None - else ",".join(str(group) for group in groups) - ), - _unicode_encode( - "" if umask is None else str(umask) - ), - _unicode_encode( - ",".join(str(fd) for fd in fd_pipes) - ), - binary, - ] - + myargs, - ) - uid = None - gid = None - groups = None - umask = None - - # Use _start_fork for os.fork() error handling, ensuring - # that if exec fails then the child process will display - # a traceback before it exits via os._exit to suppress any - # finally blocks from parent's call stack (bug 345289). - main_child_pid = _start_fork( - _exec2, - args=( - binary, - myargs, - env, - gid, - groups, - uid, - umask, - cwd, - pre_exec, - unshare_net, - unshare_ipc, - unshare_mount, - unshare_pid, - ), - fd_pipes=None, - close_fds=False, - ) - - # Execute a supervisor process which will forward - # signals to init and forward exit status to the - # parent process. The supervisor process runs in - # the global pid namespace, so skip /proc remount - # and other setup that's intended only for the - # init process. - binary, myargs = portage._python_interpreter, [ - portage._python_interpreter, - os.path.join(portage._bin_path, "pid-ns-init"), - str(main_child_pid), - ] - - os.execve(binary, myargs, env) + have_unshare = hasattr(libc, "unshare") + + if not have_unshare: + # unshare() may not be supported by libc + unshare_net = False + unshare_ipc = False + unshare_mount = False + unshare_pid = False + + if unshare_net or unshare_ipc or unshare_mount or unshare_pid: + # Since a failed unshare call could corrupt process + # state, first validate that the call can succeed. + # The parent process should call _unshare_validate + # before it forks, so that all child processes can + # reuse _unshare_validate results that have been + # cached by the parent process. + errno_value = _unshare_validate(unshare_flags) + if errno_value == 0 and libc.unshare(unshare_flags) != 0: + errno_value = ctypes.get_errno() + if errno_value != 0: + involved_features = [] + if unshare_ipc: + involved_features.append("ipc-sandbox") + if unshare_mount: + involved_features.append("mount-sandbox") + if unshare_net: + involved_features.append("network-sandbox") + if unshare_pid: + involved_features.append("pid-sandbox") + + writemsg( + 'Unable to unshare: %s (for FEATURES="%s")\n' + % ( + errno.errorcode.get(errno_value, "?"), + " ".join(involved_features), + ), + noiselevel=-1, + ) + + unshare_net = False + unshare_ipc = False + unshare_mount = False + unshare_pid = False + + if unshare_pid: + # pid namespace requires us to become init + binary, myargs = ( + portage._python_interpreter, + [ + portage._python_interpreter, + os.path.join(portage._bin_path, "pid-ns-init"), + _unicode_encode("" if uid is None else str(uid)), + _unicode_encode("" if gid is None else str(gid)), + _unicode_encode( + "" if groups is None else ",".join(str(group) for group in groups) + ), + _unicode_encode("" if umask is None else str(umask)), + _unicode_encode(",".join(str(fd) for fd in fd_pipes)), + binary, + ] + + myargs, + ) + uid = None + gid = None + groups = None + umask = None + + # Use _start_fork for os.fork() error handling, ensuring + # that if exec fails then the child process will display + # a traceback before it exits via os._exit to suppress any + # finally blocks from parent's call stack (bug 345289). + main_child_pid = _start_fork( + _exec2, + args=( + binary, + myargs, + env, + gid, + groups, + uid, + umask, + cwd, + pre_exec, + unshare_net, + unshare_ipc, + unshare_mount, + unshare_pid, + libc, + ), + fd_pipes=None, + close_fds=False, + ) + + # Execute a supervisor process which will forward + # signals to init and forward exit status to the + # parent process. The supervisor process runs in + # the global pid namespace, so skip /proc remount + # and other setup that's intended only for the + # init process. + binary, myargs = portage._python_interpreter, [ + portage._python_interpreter, + os.path.join(portage._bin_path, "pid-ns-init"), + str(main_child_pid), + ] + + os.execve(binary, myargs, env) # Reachable only if unshare_pid is False. _exec2( @@ -1100,6 +1106,7 @@ def _exec( unshare_ipc, unshare_mount, unshare_pid, + libc, ) @@ -1117,6 +1124,7 @@ def _exec2( unshare_ipc, unshare_mount, unshare_pid, + libc, ): if unshare_mount: # mark the whole filesystem as slave to avoid @@ -1506,8 +1514,44 @@ def _start_proc( proc.start() # ForkProcess conveniently holds a MultiprocessingProcess - # instance that is suitable to return here. - return proc._proc + # instance that is suitable to return here, but use _GCProtector + # to protect the ForkProcess instance from being garbage collected + # and triggering messages like this (bug 925456): + # [ERROR] Task was destroyed but it is pending! + return _GCProtector(proc._proc, proc.async_wait) + + +class _GCProtector(ObjectProxy): + """ + Proxy a target object, and also hold a reference to something + extra in order to protect it from garbage collection. Override + the wait method to first call target's wait method and then + wait for extra (a coroutine function) before returning the result. + """ + + __slots__ = ("_extra", "_target") + + def __init__(self, target, extra): + super().__init__() + object.__setattr__(self, "_target", target) + object.__setattr__(self, "_extra", extra) + + def _get_target(self): + return object.__getattribute__(self, "_target") + + def __getattribute__(self, attr): + if attr == "wait": + return object.__getattribute__(self, attr) + return getattr(object.__getattribute__(self, "_target"), attr) + + async def wait(self): + """ + Wrap the target's wait method to also wait for an extra + coroutine function. + """ + result = await object.__getattribute__(self, "_target").wait() + await object.__getattribute__(self, "_extra")() + return result def find_binary(binary): diff --git a/lib/portage/tests/ebuild/test_doebuild_spawn.py b/lib/portage/tests/ebuild/test_doebuild_spawn.py index 9fb2c7fdd..cac844f8f 100644 --- a/lib/portage/tests/ebuild/test_doebuild_spawn.py +++ b/lib/portage/tests/ebuild/test_doebuild_spawn.py @@ -1,4 +1,4 @@ -# Copyright 2010-2015 Gentoo Foundation +# Copyright 2010-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import textwrap @@ -86,6 +86,7 @@ class DoebuildSpawnTestCase(TestCase): open(os.path.join(settings["T"], "environment"), "wb").close() scheduler = SchedulerInterface(global_event_loop()) + self.assertTrue(scheduler._loop is global_event_loop()._loop) for phase in ("_internal_test",): # Test EbuildSpawnProcess by calling doebuild.spawn() with # returnpid=False. This case is no longer used by portage diff --git a/lib/portage/tests/ebuild/test_ipc_daemon.py b/lib/portage/tests/ebuild/test_ipc_daemon.py index 0beb69ddf..b8777fe94 100644 --- a/lib/portage/tests/ebuild/test_ipc_daemon.py +++ b/lib/portage/tests/ebuild/test_ipc_daemon.py @@ -1,4 +1,4 @@ -# Copyright 2010-2023 Gentoo Authors +# Copyright 2010-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import tempfile @@ -77,6 +77,7 @@ class IpcDaemonTestCase(TestCase): task_scheduler = TaskScheduler( iter([daemon, proc]), max_jobs=2, event_loop=event_loop ) + self.assertTrue(task_scheduler._loop is event_loop._loop) self.received_command = False diff --git a/lib/portage/tests/emerge/conftest.py b/lib/portage/tests/emerge/conftest.py index d9aec7041..356e09879 100644 --- a/lib/portage/tests/emerge/conftest.py +++ b/lib/portage/tests/emerge/conftest.py @@ -406,7 +406,7 @@ def async_loop(): yield asyncio._wrap_loop() -@pytest.fixture(params=SUPPORTED_GENTOO_BINPKG_FORMATS, scope="module") +@pytest.fixture(params=SUPPORTED_GENTOO_BINPKG_FORMATS, scope="function") def playground(request, tmp_path_factory): """Fixture that provides instances of ``ResolverPlayground`` each one with one supported value for ``BINPKG_FORMAT``.""" @@ -805,7 +805,7 @@ def _generate_all_baseline_commands(playground, binhost): test_commands["binhost emerge"] = Noop() else: # The next emerge has been added to split this test from the rest: - make_package = Emerge("--buildpkg", "dev-libs/A") + make_package = Emerge("-e", "--buildpkg", "dev-libs/A") getbinpkgonly = Emerge( "-e", "--getbinpkgonly", diff --git a/lib/portage/tests/process/test_spawn_returnproc.py b/lib/portage/tests/process/test_spawn_returnproc.py index 6d823d9c3..8fbf54d0d 100644 --- a/lib/portage/tests/process/test_spawn_returnproc.py +++ b/lib/portage/tests/process/test_spawn_returnproc.py @@ -32,7 +32,7 @@ class SpawnReturnProcTestCase(TestCase): loop = global_event_loop() async def watch_pid(): - proc = spawn([sleep_binary, 9999], returnproc=True) + proc = spawn([sleep_binary, "9999"], returnproc=True) proc.terminate() self.assertEqual(await proc.wait(), -signal.SIGTERM) diff --git a/lib/portage/tests/resolver/test_depth.py b/lib/portage/tests/resolver/test_depth.py index 9c5289f7d..ab5f8e7ec 100644 --- a/lib/portage/tests/resolver/test_depth.py +++ b/lib/portage/tests/resolver/test_depth.py @@ -1,4 +1,4 @@ -# Copyright 2011-2020 Gentoo Authors +# Copyright 2011-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 from portage.tests import TestCase @@ -318,6 +318,12 @@ class ResolverDepthTestCase(TestCase): "sys-fs/udev-164", ], ), + ResolverPlaygroundTestCase( + ["@world"], + options={"--emptytree": True, "--exclude": ["dev-libs/B"]}, + success=True, + mergelist=["dev-libs/C-2", "dev-libs/A-2"], + ), ) playground = ResolverPlayground( diff --git a/lib/portage/tests/resolver/test_emptytree_reinstall_unsatisfiability.py b/lib/portage/tests/resolver/test_emptytree_reinstall_unsatisfiability.py new file mode 100644 index 000000000..fcdc01d7f --- /dev/null +++ b/lib/portage/tests/resolver/test_emptytree_reinstall_unsatisfiability.py @@ -0,0 +1,137 @@ +# Copyright 2024 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from portage.tests import TestCase +from portage.tests.resolver.ResolverPlayground import ( + ResolverPlayground, + ResolverPlaygroundTestCase, +) + + +class EmptytreeReinstallUnsatisfiabilityTestCase(TestCase): + def testEmptytreeReinstallUnsatisfiability(self): + """ + Tests to check if emerge fails and complains when --emptytree + package dependency graph reinstall is unsatisfied, even if the already + installed packages successfully satisfy the dependency tree. + + See bug #651018 where emerge silently skips package + reinstalls because of unsatisfied use flag requirements. + """ + ebuilds = { + "dev-libs/A-1": { + "DEPEND": "dev-libs/B", + "RDEPEND": "dev-libs/B", + "EAPI": "2", + }, + "dev-libs/B-1": { + "DEPEND": "dev-libs/C[foo]", + "RDEPEND": "dev-libs/C[foo]", + "EAPI": "2", + }, + "dev-libs/C-1": { + "IUSE": "foo", + "EAPI": "2", + }, + "dev-libs/X-1": { + "DEPEND": "dev-libs/Y[-baz]", + "RDEPEND": "dev-libs/Y[-baz]", + "EAPI": "2", + }, + "dev-libs/Y-1": { + "IUSE": "baz", + "EAPI": "2", + }, + "dev-libs/Z-1": { + "DEPEND": "dev-libs/W", + "RDEPEND": "dev-libs/W", + "EAPI": "2", + }, + "dev-libs/W-1": { + "EAPI": "2", + }, + } + + installed = { + "dev-libs/A-1": { + "DEPEND": "dev-libs/B", + "RDEPEND": "dev-libs/B", + "EAPI": "2", + }, + "dev-libs/B-1": { + "DEPEND": "dev-libs/C[foo]", + "RDEPEND": "dev-libs/C[foo]", + "EAPI": "2", + }, + "dev-libs/C-1": { + "IUSE": "foo", + "USE": "foo", + "EAPI": "2", + }, + "dev-libs/X-1": { + "DEPEND": "dev-libs/Y[-baz]", + "RDEPEND": "dev-libs/Y[-baz]", + "EAPI": "2", + }, + "dev-libs/Y-1": { + "IUSE": "baz", + "USE": "-baz", + "EAPI": "2", + }, + "dev-libs/Z-1": { + "DEPEND": "dev-libs/W", + "RDEPEND": "dev-libs/W", + "EAPI": "2", + }, + "dev-libs/W-1": { + "EAPI": "2", + }, + } + + user_config = { + "package.use": ("dev-libs/Y baz",), + "package.mask": ("dev-libs/W",), + } + + world = ["dev-libs/X"] + + test_cases = ( + ResolverPlaygroundTestCase( + ["dev-libs/A"], + options={"--emptytree": True}, + success=False, + mergelist=["dev-libs/C-1", "dev-libs/B-1", "dev-libs/A-1"], + use_changes={"dev-libs/C-1": {"foo": True}}, + ), + ResolverPlaygroundTestCase( + ["dev-libs/A"], + options={"--emptytree": True, "--exclude": ["dev-libs/C"]}, + success=True, + mergelist=["dev-libs/B-1", "dev-libs/A-1"], + ), + ResolverPlaygroundTestCase( + ["@world"], + options={"--emptytree": True}, + success=False, + mergelist=["dev-libs/Y-1", "dev-libs/X-1"], + use_changes={"dev-libs/Y-1": {"baz": False}}, + ), + ResolverPlaygroundTestCase( + ["dev-libs/Z"], + options={"--emptytree": True}, + success=False, + ), + ) + + playground = ResolverPlayground( + ebuilds=ebuilds, + installed=installed, + user_config=user_config, + world=world, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual(test_case.test_success, True, test_case.fail_msg) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_useflags.py b/lib/portage/tests/resolver/test_useflags.py index 86684f7f2..142a31c7f 100644 --- a/lib/portage/tests/resolver/test_useflags.py +++ b/lib/portage/tests/resolver/test_useflags.py @@ -1,4 +1,4 @@ -# Copyright 2014 Gentoo Foundation +# Copyright 2014-2024 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import sys @@ -292,8 +292,8 @@ class UseFlagsTestCase(TestCase): "--usepkg": True, }, success=False, - mergelist=["[binary]dev-libs/A-2", "dev-libs/B-1"], - slot_collision_solutions=[], + mergelist=None, + slot_collision_solutions=None, ), ) diff --git a/lib/portage/tests/util/test_socks5.py b/lib/portage/tests/util/test_socks5.py index 987b41af2..4a6d08169 100644 --- a/lib/portage/tests/util/test_socks5.py +++ b/lib/portage/tests/util/test_socks5.py @@ -1,6 +1,7 @@ -# Copyright 2019-2021 Gentoo Authors +# Copyright 2019-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +import asyncio import functools import shutil import socket @@ -10,8 +11,9 @@ import time import portage from portage.tests import TestCase -from portage.util._eventloop.global_event_loop import global_event_loop from portage.util import socks5 +from portage.util.futures.executor.fork import ForkExecutor +from portage.util._eventloop.global_event_loop import global_event_loop from portage.const import PORTAGE_BIN_PATH from http.server import BaseHTTPRequestHandler, HTTPServer @@ -88,18 +90,20 @@ class AsyncHTTPServerTestCase(TestCase): if f is not None: f.close() - def test_http_server(self): + async def _test_http_server(self): + asyncio.run(self._test_http_server()) + + async def _test_http_server(self): host = "127.0.0.1" content = b"Hello World!\n" path = "/index.html" - loop = global_event_loop() + + loop = asyncio.get_running_loop() for i in range(2): with AsyncHTTPServer(host, {path: content}, loop) as server: for j in range(2): - result = loop.run_until_complete( - loop.run_in_executor( - None, self._fetch_directly, host, server.server_port, path - ) + result = await loop.run_in_executor( + None, self._fetch_directly, host, server.server_port, path ) self.assertEqual(result, content) @@ -177,15 +181,20 @@ class Socks5ServerTestCase(TestCase): return f.read() def test_socks5_proxy(self): - loop = global_event_loop() + asyncio.run(self._test_socks5_proxy()) + + async def _test_socks5_proxy(self): + loop = asyncio.get_running_loop() host = "127.0.0.1" content = b"Hello World!" path = "/index.html" proxy = None tempdir = tempfile.mkdtemp() + previous_exithandlers = portage.process._exithandlers try: + portage.process._exithandlers = [] with AsyncHTTPServer(host, {path: content}, loop) as server: settings = { "PORTAGE_TMPDIR": tempdir, @@ -193,20 +202,63 @@ class Socks5ServerTestCase(TestCase): } proxy = socks5.get_socks5_proxy(settings) - loop.run_until_complete(socks5.proxy.ready()) - - result = loop.run_until_complete( - loop.run_in_executor( - None, - self._fetch_via_proxy, - proxy, - host, - server.server_port, - path, - ) + await socks5.proxy.ready() + + result = await loop.run_in_executor( + None, + self._fetch_via_proxy, + proxy, + host, + server.server_port, + path, ) self.assertEqual(result, content) finally: - socks5.proxy.stop() + try: + # Also run_exitfuncs to test atexit hook cleanup. + await socks5.proxy.stop() + self.assertNotEqual(portage.process._exithandlers, []) + portage.process.run_exitfuncs() + self.assertEqual(portage.process._exithandlers, []) + finally: + portage.process._exithandlers = previous_exithandlers + shutil.rmtree(tempdir) + + +class Socks5ServerLoopCloseTestCase(TestCase): + """ + For bug 925240, test that the socks5 proxy is automatically + terminated when the main event loop is closed, using a subprocess + for isolation. + """ + + def testSocks5ServerLoopClose(self): + asyncio.run(self._testSocks5ServerLoopClose()) + + async def _testSocks5ServerLoopClose(self): + loop = asyncio.get_running_loop() + self.assertEqual( + await loop.run_in_executor( + ForkExecutor(loop=loop), self._testSocks5ServerLoopCloseSubprocess + ), + True, + ) + + @staticmethod + def _testSocks5ServerLoopCloseSubprocess(): + loop = global_event_loop() + tempdir = tempfile.mkdtemp() + try: + settings = { + "PORTAGE_TMPDIR": tempdir, + "PORTAGE_BIN_PATH": PORTAGE_BIN_PATH, + } + + socks5.get_socks5_proxy(settings) + loop.run_until_complete(socks5.proxy.ready()) + finally: + loop.close() shutil.rmtree(tempdir) + + return not socks5.proxy.is_running() diff --git a/lib/portage/util/_async/SchedulerInterface.py b/lib/portage/util/_async/SchedulerInterface.py index 43a42adff..485958491 100644 --- a/lib/portage/util/_async/SchedulerInterface.py +++ b/lib/portage/util/_async/SchedulerInterface.py @@ -1,4 +1,4 @@ -# Copyright 2012-2021 Gentoo Authors +# Copyright 2012-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import gzip @@ -49,6 +49,13 @@ class SchedulerInterface(SlotObject): for k in self._event_loop_attrs: setattr(self, k, getattr(event_loop, k)) + @property + def _loop(self): + """ + Returns the real underlying asyncio loop. + """ + return self._event_loop._loop + @staticmethod def _return_false(): return False diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py index b9e96bb20..ee9e4c60e 100644 --- a/lib/portage/util/_eventloop/asyncio_event_loop.py +++ b/lib/portage/util/_eventloop/asyncio_event_loop.py @@ -1,8 +1,9 @@ -# Copyright 2018-2023 Gentoo Authors +# Copyright 2018-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import os import signal +import threading import asyncio as _real_asyncio from asyncio.events import AbstractEventLoop as _AbstractEventLoop @@ -14,6 +15,7 @@ except ImportError: PidfdChildWatcher = None import portage +from portage.util import socks5 class AsyncioEventLoop(_AbstractEventLoop): @@ -25,18 +27,14 @@ class AsyncioEventLoop(_AbstractEventLoop): def __init__(self, loop=None): loop = loop or _real_asyncio.get_event_loop() self._loop = loop - self.run_until_complete = ( - self._run_until_complete - if portage._internal_caller - else loop.run_until_complete - ) + self.run_until_complete = self._run_until_complete self.call_soon = loop.call_soon self.call_soon_threadsafe = loop.call_soon_threadsafe self.call_later = loop.call_later self.call_at = loop.call_at self.is_running = loop.is_running self.is_closed = loop.is_closed - self.close = loop.close + self.close = self._close self.create_future = ( loop.create_future if hasattr(loop, "create_future") @@ -55,10 +53,36 @@ class AsyncioEventLoop(_AbstractEventLoop): self.get_debug = loop.get_debug self._wakeup_fd = -1 self._child_watcher = None + # Used to drop recursive calls to _close. + self._closing = False + # Initialized in _run_until_complete. + self._is_main = None if portage._internal_caller: loop.set_exception_handler(self._internal_caller_exception_handler) + def _close(self): + """ + Before closing the main loop, run portage.process.run_exitfuncs() + with the event loop running so that anything attached can clean + itself up (like the socks5 ProxyManager for bug 925240). + """ + if not (self._closing or self.is_closed()): + self._closing = True + if self._is_main: + self.run_until_complete(self._close_main()) + self._loop.close() + self._closing = False + + async def _close_main(self): + # Even though this has an exit hook, invoke it here so that + # we can properly wait for it and avoid messages like this: + # [ERROR] Task was destroyed but it is pending! + if socks5.proxy.is_running(): + await socks5.proxy.stop() + + portage.process.run_exitfuncs() + @staticmethod def _internal_caller_exception_handler(loop, context): """ @@ -139,6 +163,12 @@ class AsyncioEventLoop(_AbstractEventLoop): In order to avoid potential interference with API consumers, this implementation is only used when portage._internal_caller is True. """ + if self._is_main is None: + self._is_main = threading.current_thread() is threading.main_thread() + + if not portage._internal_caller: + return self._loop.run_until_complete(future) + if self._wakeup_fd != -1: signal.set_wakeup_fd(self._wakeup_fd) self._wakeup_fd = -1 diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py index 22241f335..4eecc46a8 100644 --- a/lib/portage/util/futures/_asyncio/__init__.py +++ b/lib/portage/util/futures/_asyncio/__init__.py @@ -325,13 +325,37 @@ def _safe_loop(): def _get_running_loop(): + """ + This calls the real asyncio get_running_loop() and wraps that with + portage's internal AsyncioEventLoop wrapper. If there is no running + asyncio event loop but portage has a reference to another running + loop in this thread, then use that instead. + + This behavior enables portage internals to use the real asyncio.run + while remaining compatible with internal code that does not use the + real asyncio.run. + """ + try: + _loop = _real_asyncio.get_running_loop() + except RuntimeError: + _loop = None + with _thread_weakrefs.lock: if _thread_weakrefs.pid == portage.getpid(): try: loop = _thread_weakrefs.loops[threading.get_ident()] except KeyError: - return None - return loop if loop.is_running() else None + pass + else: + if _loop is loop._loop: + return loop + elif _loop is None: + return loop if loop.is_running() else None + + # If _loop it not None here it means it was probably a temporary + # loop created by asyncio.run, so we don't try to cache it, and + # just return a temporary wrapper. + return None if _loop is None else _AsyncioEventLoop(loop=_loop) def _thread_weakrefs_atexit(): diff --git a/lib/portage/util/socks5.py b/lib/portage/util/socks5.py index 6c68ff410..f8fcdf9fc 100644 --- a/lib/portage/util/socks5.py +++ b/lib/portage/util/socks5.py @@ -2,15 +2,23 @@ # Copyright 2015-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +import asyncio import errno import os import socket +from typing import Union + +import portage + +portage.proxy.lazyimport.lazyimport( + globals(), + "portage.util._eventloop.global_event_loop:global_event_loop", +) import portage.data from portage import _python_interpreter from portage.data import portage_gid, portage_uid, userpriv_groups from portage.process import atexit_register, spawn -from portage.util.futures import asyncio class ProxyManager: @@ -57,23 +65,41 @@ class ProxyManager: **spawn_kwargs, ) - def stop(self): + def stop(self) -> Union[None, asyncio.Future]: """ Stop the SOCKSv5 server. + + If there is a running asyncio event loop then asyncio.Future is + returned which should be used to wait for the server process + to exit. """ + future = None + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None if self._proc is not None: self._proc.terminate() - loop = asyncio.get_event_loop() - if self._proc_waiter is None: - self._proc_waiter = asyncio.ensure_future(self._proc.wait(), loop) - if loop.is_running(): - self._proc_waiter.add_done_callback(lambda future: future.result()) + if loop is None: + # In this case spawn internals would have used + # portage's global loop when attaching a waiter to + # self._proc, so we are obligated to use that. + global_event_loop().run_until_complete(self._proc.wait()) else: - loop.run_until_complete(self._proc_waiter) + if self._proc_waiter is None: + self._proc_waiter = asyncio.ensure_future( + self._proc.wait(), loop=loop + ) + future = asyncio.shield(self._proc_waiter) + + if loop is not None and future is None: + future = loop.create_future() + future.set_result(None) self.socket_path = None self._proc = None self._proc_waiter = None + return future def is_running(self): """ |