aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/_emerge/PollScheduler.py9
-rw-r--r--lib/_emerge/depgraph.py15
-rw-r--r--lib/portage/dbapi/__init__.py11
-rw-r--r--lib/portage/dbapi/porttree.py2
-rw-r--r--lib/portage/process.py254
-rw-r--r--lib/portage/tests/ebuild/test_doebuild_spawn.py3
-rw-r--r--lib/portage/tests/ebuild/test_ipc_daemon.py3
-rw-r--r--lib/portage/tests/emerge/conftest.py4
-rw-r--r--lib/portage/tests/process/test_spawn_returnproc.py2
-rw-r--r--lib/portage/tests/resolver/test_depth.py8
-rw-r--r--lib/portage/tests/resolver/test_emptytree_reinstall_unsatisfiability.py137
-rw-r--r--lib/portage/tests/resolver/test_useflags.py6
-rw-r--r--lib/portage/tests/util/test_socks5.py94
-rw-r--r--lib/portage/util/_async/SchedulerInterface.py9
-rw-r--r--lib/portage/util/_eventloop/asyncio_event_loop.py44
-rw-r--r--lib/portage/util/futures/_asyncio/__init__.py28
-rw-r--r--lib/portage/util/socks5.py42
17 files changed, 511 insertions, 160 deletions
diff --git a/lib/_emerge/PollScheduler.py b/lib/_emerge/PollScheduler.py
index e5dffd8af..b5bfd20b7 100644
--- a/lib/_emerge/PollScheduler.py
+++ b/lib/_emerge/PollScheduler.py
@@ -1,4 +1,4 @@
-# Copyright 1999-2023 Gentoo Authors
+# Copyright 1999-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import threading
@@ -39,6 +39,13 @@ class PollScheduler:
self._event_loop, is_background=self._is_background
)
+ @property
+ def _loop(self):
+ """
+ Returns the real underlying asyncio loop.
+ """
+ return self._event_loop._loop
+
def _is_background(self):
return self._background
diff --git a/lib/_emerge/depgraph.py b/lib/_emerge/depgraph.py
index a53eaaba9..63e7ad1ee 100644
--- a/lib/_emerge/depgraph.py
+++ b/lib/_emerge/depgraph.py
@@ -783,7 +783,7 @@ class depgraph:
ebuild_hash=ebuild_hash,
portdb=portdb,
repo_path=repo_path,
- settings=portdb.doebuild_settings,
+ settings=settings,
deallocate_config=deallocate_config,
)
proc.addExitListener(self._dynamic_deps_proc_exit(pkg, fake_vartree))
@@ -7640,6 +7640,19 @@ class depgraph:
continue
if (
+ empty
+ and pkg.installed
+ and not self._frozen_config.excluded_pkgs.findAtomForPackage(
+ pkg, modified_use=self._pkg_use_enabled(pkg)
+ )
+ ):
+ # With --emptytree option we assume no packages
+ # are installed, so we do not select them.
+ # But we allow installed packages to satisfy dependency requirements
+ # if they're explicitly excluded, so we allow them to be selected.
+ continue
+
+ if (
not pkg.installed
and self._frozen_config.excluded_pkgs.findAtomForPackage(
pkg, modified_use=self._pkg_use_enabled(pkg)
diff --git a/lib/portage/dbapi/__init__.py b/lib/portage/dbapi/__init__.py
index 6f95b93a2..9105227c7 100644
--- a/lib/portage/dbapi/__init__.py
+++ b/lib/portage/dbapi/__init__.py
@@ -1,11 +1,12 @@
-# Copyright 1998-2023 Gentoo Authors
+# Copyright 1998-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
__all__ = ["dbapi"]
import functools
+import logging
import re
-import warnings
+import sys
from typing import Any, Dict, List, Optional, Tuple
from collections.abc import Sequence
@@ -429,7 +430,9 @@ class dbapi:
try:
aux_update(cpv, metadata_updates)
except (InvalidBinaryPackageFormat, CorruptionKeyError) as e:
- warnings.warn(e)
+ logging.warning(
+ f"{e.__class__.__name__}: {e}", exc_info=sys.exc_info()
+ )
if onUpdate:
onUpdate(maxval, i + 1)
if onProgress:
@@ -477,6 +480,6 @@ class dbapi:
try:
self.aux_update(mycpv, mydata)
except CorruptionKeyError as e:
- warnings.warn(e)
+ logging.warning(f"{e.__class__.__name__}: {e}", exc_info=sys.exc_info())
continue
return moves
diff --git a/lib/portage/dbapi/porttree.py b/lib/portage/dbapi/porttree.py
index 4eebe1183..de6aa5c82 100644
--- a/lib/portage/dbapi/porttree.py
+++ b/lib/portage/dbapi/porttree.py
@@ -775,7 +775,7 @@ class portdbapi(dbapi):
try:
if (
threading.current_thread() is threading.main_thread()
- and loop is asyncio._safe_loop()
+ and loop._loop is asyncio._safe_loop()._loop
):
# In this case use self._doebuild_settings_lock to manage concurrency.
deallocate_config = loop.create_future()
diff --git a/lib/portage/process.py b/lib/portage/process.py
index ead11e318..2365778e6 100644
--- a/lib/portage/process.py
+++ b/lib/portage/process.py
@@ -38,6 +38,7 @@ from portage.const import BASH_BINARY, SANDBOX_BINARY, FAKEROOT_BINARY
# PREFIX LOCAL
from portage.const import MACOSSANDBOX_BINARY
from portage.exception import CommandNotFound
+from portage.proxy.objectproxy import ObjectProxy
from portage.util._ctypes import find_library, LoadLibrary, ctypes
try:
@@ -976,114 +977,119 @@ def _exec(
signal.signal(signal.SIGQUIT, signal.SIG_DFL)
# Unshare (while still uid==0)
+ have_unshare = False
+ libc = None
if unshare_net or unshare_ipc or unshare_mount or unshare_pid:
filename = find_library("c")
if filename is not None:
libc = LoadLibrary(filename)
if libc is not None:
- # unshare() may not be supported by libc
- if not hasattr(libc, "unshare"):
- unshare_net = False
- unshare_ipc = False
- unshare_mount = False
- unshare_pid = False
- else:
- # Since a failed unshare call could corrupt process
- # state, first validate that the call can succeed.
- # The parent process should call _unshare_validate
- # before it forks, so that all child processes can
- # reuse _unshare_validate results that have been
- # cached by the parent process.
- errno_value = _unshare_validate(unshare_flags)
- if errno_value == 0 and libc.unshare(unshare_flags) != 0:
- errno_value = ctypes.get_errno()
- if errno_value != 0:
- involved_features = []
- if unshare_ipc:
- involved_features.append("ipc-sandbox")
- if unshare_mount:
- involved_features.append("mount-sandbox")
- if unshare_net:
- involved_features.append("network-sandbox")
- if unshare_pid:
- involved_features.append("pid-sandbox")
-
- writemsg(
- 'Unable to unshare: %s (for FEATURES="%s")\n'
- % (
- errno.errorcode.get(errno_value, "?"),
- " ".join(involved_features),
- ),
- noiselevel=-1,
- )
- else:
- if unshare_pid:
- # pid namespace requires us to become init
- binary, myargs = (
- portage._python_interpreter,
- [
- portage._python_interpreter,
- os.path.join(portage._bin_path, "pid-ns-init"),
- _unicode_encode("" if uid is None else str(uid)),
- _unicode_encode("" if gid is None else str(gid)),
- _unicode_encode(
- ""
- if groups is None
- else ",".join(str(group) for group in groups)
- ),
- _unicode_encode(
- "" if umask is None else str(umask)
- ),
- _unicode_encode(
- ",".join(str(fd) for fd in fd_pipes)
- ),
- binary,
- ]
- + myargs,
- )
- uid = None
- gid = None
- groups = None
- umask = None
-
- # Use _start_fork for os.fork() error handling, ensuring
- # that if exec fails then the child process will display
- # a traceback before it exits via os._exit to suppress any
- # finally blocks from parent's call stack (bug 345289).
- main_child_pid = _start_fork(
- _exec2,
- args=(
- binary,
- myargs,
- env,
- gid,
- groups,
- uid,
- umask,
- cwd,
- pre_exec,
- unshare_net,
- unshare_ipc,
- unshare_mount,
- unshare_pid,
- ),
- fd_pipes=None,
- close_fds=False,
- )
-
- # Execute a supervisor process which will forward
- # signals to init and forward exit status to the
- # parent process. The supervisor process runs in
- # the global pid namespace, so skip /proc remount
- # and other setup that's intended only for the
- # init process.
- binary, myargs = portage._python_interpreter, [
- portage._python_interpreter,
- os.path.join(portage._bin_path, "pid-ns-init"),
- str(main_child_pid),
- ]
-
- os.execve(binary, myargs, env)
+ have_unshare = hasattr(libc, "unshare")
+
+ if not have_unshare:
+ # unshare() may not be supported by libc
+ unshare_net = False
+ unshare_ipc = False
+ unshare_mount = False
+ unshare_pid = False
+
+ if unshare_net or unshare_ipc or unshare_mount or unshare_pid:
+ # Since a failed unshare call could corrupt process
+ # state, first validate that the call can succeed.
+ # The parent process should call _unshare_validate
+ # before it forks, so that all child processes can
+ # reuse _unshare_validate results that have been
+ # cached by the parent process.
+ errno_value = _unshare_validate(unshare_flags)
+ if errno_value == 0 and libc.unshare(unshare_flags) != 0:
+ errno_value = ctypes.get_errno()
+ if errno_value != 0:
+ involved_features = []
+ if unshare_ipc:
+ involved_features.append("ipc-sandbox")
+ if unshare_mount:
+ involved_features.append("mount-sandbox")
+ if unshare_net:
+ involved_features.append("network-sandbox")
+ if unshare_pid:
+ involved_features.append("pid-sandbox")
+
+ writemsg(
+ 'Unable to unshare: %s (for FEATURES="%s")\n'
+ % (
+ errno.errorcode.get(errno_value, "?"),
+ " ".join(involved_features),
+ ),
+ noiselevel=-1,
+ )
+
+ unshare_net = False
+ unshare_ipc = False
+ unshare_mount = False
+ unshare_pid = False
+
+ if unshare_pid:
+ # pid namespace requires us to become init
+ binary, myargs = (
+ portage._python_interpreter,
+ [
+ portage._python_interpreter,
+ os.path.join(portage._bin_path, "pid-ns-init"),
+ _unicode_encode("" if uid is None else str(uid)),
+ _unicode_encode("" if gid is None else str(gid)),
+ _unicode_encode(
+ "" if groups is None else ",".join(str(group) for group in groups)
+ ),
+ _unicode_encode("" if umask is None else str(umask)),
+ _unicode_encode(",".join(str(fd) for fd in fd_pipes)),
+ binary,
+ ]
+ + myargs,
+ )
+ uid = None
+ gid = None
+ groups = None
+ umask = None
+
+ # Use _start_fork for os.fork() error handling, ensuring
+ # that if exec fails then the child process will display
+ # a traceback before it exits via os._exit to suppress any
+ # finally blocks from parent's call stack (bug 345289).
+ main_child_pid = _start_fork(
+ _exec2,
+ args=(
+ binary,
+ myargs,
+ env,
+ gid,
+ groups,
+ uid,
+ umask,
+ cwd,
+ pre_exec,
+ unshare_net,
+ unshare_ipc,
+ unshare_mount,
+ unshare_pid,
+ libc,
+ ),
+ fd_pipes=None,
+ close_fds=False,
+ )
+
+ # Execute a supervisor process which will forward
+ # signals to init and forward exit status to the
+ # parent process. The supervisor process runs in
+ # the global pid namespace, so skip /proc remount
+ # and other setup that's intended only for the
+ # init process.
+ binary, myargs = portage._python_interpreter, [
+ portage._python_interpreter,
+ os.path.join(portage._bin_path, "pid-ns-init"),
+ str(main_child_pid),
+ ]
+
+ os.execve(binary, myargs, env)
# Reachable only if unshare_pid is False.
_exec2(
@@ -1100,6 +1106,7 @@ def _exec(
unshare_ipc,
unshare_mount,
unshare_pid,
+ libc,
)
@@ -1117,6 +1124,7 @@ def _exec2(
unshare_ipc,
unshare_mount,
unshare_pid,
+ libc,
):
if unshare_mount:
# mark the whole filesystem as slave to avoid
@@ -1506,8 +1514,44 @@ def _start_proc(
proc.start()
# ForkProcess conveniently holds a MultiprocessingProcess
- # instance that is suitable to return here.
- return proc._proc
+ # instance that is suitable to return here, but use _GCProtector
+ # to protect the ForkProcess instance from being garbage collected
+ # and triggering messages like this (bug 925456):
+ # [ERROR] Task was destroyed but it is pending!
+ return _GCProtector(proc._proc, proc.async_wait)
+
+
+class _GCProtector(ObjectProxy):
+ """
+ Proxy a target object, and also hold a reference to something
+ extra in order to protect it from garbage collection. Override
+ the wait method to first call target's wait method and then
+ wait for extra (a coroutine function) before returning the result.
+ """
+
+ __slots__ = ("_extra", "_target")
+
+ def __init__(self, target, extra):
+ super().__init__()
+ object.__setattr__(self, "_target", target)
+ object.__setattr__(self, "_extra", extra)
+
+ def _get_target(self):
+ return object.__getattribute__(self, "_target")
+
+ def __getattribute__(self, attr):
+ if attr == "wait":
+ return object.__getattribute__(self, attr)
+ return getattr(object.__getattribute__(self, "_target"), attr)
+
+ async def wait(self):
+ """
+ Wrap the target's wait method to also wait for an extra
+ coroutine function.
+ """
+ result = await object.__getattribute__(self, "_target").wait()
+ await object.__getattribute__(self, "_extra")()
+ return result
def find_binary(binary):
diff --git a/lib/portage/tests/ebuild/test_doebuild_spawn.py b/lib/portage/tests/ebuild/test_doebuild_spawn.py
index 9fb2c7fdd..cac844f8f 100644
--- a/lib/portage/tests/ebuild/test_doebuild_spawn.py
+++ b/lib/portage/tests/ebuild/test_doebuild_spawn.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2015 Gentoo Foundation
+# Copyright 2010-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import textwrap
@@ -86,6 +86,7 @@ class DoebuildSpawnTestCase(TestCase):
open(os.path.join(settings["T"], "environment"), "wb").close()
scheduler = SchedulerInterface(global_event_loop())
+ self.assertTrue(scheduler._loop is global_event_loop()._loop)
for phase in ("_internal_test",):
# Test EbuildSpawnProcess by calling doebuild.spawn() with
# returnpid=False. This case is no longer used by portage
diff --git a/lib/portage/tests/ebuild/test_ipc_daemon.py b/lib/portage/tests/ebuild/test_ipc_daemon.py
index 0beb69ddf..b8777fe94 100644
--- a/lib/portage/tests/ebuild/test_ipc_daemon.py
+++ b/lib/portage/tests/ebuild/test_ipc_daemon.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2023 Gentoo Authors
+# Copyright 2010-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import tempfile
@@ -77,6 +77,7 @@ class IpcDaemonTestCase(TestCase):
task_scheduler = TaskScheduler(
iter([daemon, proc]), max_jobs=2, event_loop=event_loop
)
+ self.assertTrue(task_scheduler._loop is event_loop._loop)
self.received_command = False
diff --git a/lib/portage/tests/emerge/conftest.py b/lib/portage/tests/emerge/conftest.py
index d9aec7041..356e09879 100644
--- a/lib/portage/tests/emerge/conftest.py
+++ b/lib/portage/tests/emerge/conftest.py
@@ -406,7 +406,7 @@ def async_loop():
yield asyncio._wrap_loop()
-@pytest.fixture(params=SUPPORTED_GENTOO_BINPKG_FORMATS, scope="module")
+@pytest.fixture(params=SUPPORTED_GENTOO_BINPKG_FORMATS, scope="function")
def playground(request, tmp_path_factory):
"""Fixture that provides instances of ``ResolverPlayground``
each one with one supported value for ``BINPKG_FORMAT``."""
@@ -805,7 +805,7 @@ def _generate_all_baseline_commands(playground, binhost):
test_commands["binhost emerge"] = Noop()
else:
# The next emerge has been added to split this test from the rest:
- make_package = Emerge("--buildpkg", "dev-libs/A")
+ make_package = Emerge("-e", "--buildpkg", "dev-libs/A")
getbinpkgonly = Emerge(
"-e",
"--getbinpkgonly",
diff --git a/lib/portage/tests/process/test_spawn_returnproc.py b/lib/portage/tests/process/test_spawn_returnproc.py
index 6d823d9c3..8fbf54d0d 100644
--- a/lib/portage/tests/process/test_spawn_returnproc.py
+++ b/lib/portage/tests/process/test_spawn_returnproc.py
@@ -32,7 +32,7 @@ class SpawnReturnProcTestCase(TestCase):
loop = global_event_loop()
async def watch_pid():
- proc = spawn([sleep_binary, 9999], returnproc=True)
+ proc = spawn([sleep_binary, "9999"], returnproc=True)
proc.terminate()
self.assertEqual(await proc.wait(), -signal.SIGTERM)
diff --git a/lib/portage/tests/resolver/test_depth.py b/lib/portage/tests/resolver/test_depth.py
index 9c5289f7d..ab5f8e7ec 100644
--- a/lib/portage/tests/resolver/test_depth.py
+++ b/lib/portage/tests/resolver/test_depth.py
@@ -1,4 +1,4 @@
-# Copyright 2011-2020 Gentoo Authors
+# Copyright 2011-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
from portage.tests import TestCase
@@ -318,6 +318,12 @@ class ResolverDepthTestCase(TestCase):
"sys-fs/udev-164",
],
),
+ ResolverPlaygroundTestCase(
+ ["@world"],
+ options={"--emptytree": True, "--exclude": ["dev-libs/B"]},
+ success=True,
+ mergelist=["dev-libs/C-2", "dev-libs/A-2"],
+ ),
)
playground = ResolverPlayground(
diff --git a/lib/portage/tests/resolver/test_emptytree_reinstall_unsatisfiability.py b/lib/portage/tests/resolver/test_emptytree_reinstall_unsatisfiability.py
new file mode 100644
index 000000000..fcdc01d7f
--- /dev/null
+++ b/lib/portage/tests/resolver/test_emptytree_reinstall_unsatisfiability.py
@@ -0,0 +1,137 @@
+# Copyright 2024 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.tests import TestCase
+from portage.tests.resolver.ResolverPlayground import (
+ ResolverPlayground,
+ ResolverPlaygroundTestCase,
+)
+
+
+class EmptytreeReinstallUnsatisfiabilityTestCase(TestCase):
+ def testEmptytreeReinstallUnsatisfiability(self):
+ """
+ Tests to check if emerge fails and complains when --emptytree
+ package dependency graph reinstall is unsatisfied, even if the already
+ installed packages successfully satisfy the dependency tree.
+
+ See bug #651018 where emerge silently skips package
+ reinstalls because of unsatisfied use flag requirements.
+ """
+ ebuilds = {
+ "dev-libs/A-1": {
+ "DEPEND": "dev-libs/B",
+ "RDEPEND": "dev-libs/B",
+ "EAPI": "2",
+ },
+ "dev-libs/B-1": {
+ "DEPEND": "dev-libs/C[foo]",
+ "RDEPEND": "dev-libs/C[foo]",
+ "EAPI": "2",
+ },
+ "dev-libs/C-1": {
+ "IUSE": "foo",
+ "EAPI": "2",
+ },
+ "dev-libs/X-1": {
+ "DEPEND": "dev-libs/Y[-baz]",
+ "RDEPEND": "dev-libs/Y[-baz]",
+ "EAPI": "2",
+ },
+ "dev-libs/Y-1": {
+ "IUSE": "baz",
+ "EAPI": "2",
+ },
+ "dev-libs/Z-1": {
+ "DEPEND": "dev-libs/W",
+ "RDEPEND": "dev-libs/W",
+ "EAPI": "2",
+ },
+ "dev-libs/W-1": {
+ "EAPI": "2",
+ },
+ }
+
+ installed = {
+ "dev-libs/A-1": {
+ "DEPEND": "dev-libs/B",
+ "RDEPEND": "dev-libs/B",
+ "EAPI": "2",
+ },
+ "dev-libs/B-1": {
+ "DEPEND": "dev-libs/C[foo]",
+ "RDEPEND": "dev-libs/C[foo]",
+ "EAPI": "2",
+ },
+ "dev-libs/C-1": {
+ "IUSE": "foo",
+ "USE": "foo",
+ "EAPI": "2",
+ },
+ "dev-libs/X-1": {
+ "DEPEND": "dev-libs/Y[-baz]",
+ "RDEPEND": "dev-libs/Y[-baz]",
+ "EAPI": "2",
+ },
+ "dev-libs/Y-1": {
+ "IUSE": "baz",
+ "USE": "-baz",
+ "EAPI": "2",
+ },
+ "dev-libs/Z-1": {
+ "DEPEND": "dev-libs/W",
+ "RDEPEND": "dev-libs/W",
+ "EAPI": "2",
+ },
+ "dev-libs/W-1": {
+ "EAPI": "2",
+ },
+ }
+
+ user_config = {
+ "package.use": ("dev-libs/Y baz",),
+ "package.mask": ("dev-libs/W",),
+ }
+
+ world = ["dev-libs/X"]
+
+ test_cases = (
+ ResolverPlaygroundTestCase(
+ ["dev-libs/A"],
+ options={"--emptytree": True},
+ success=False,
+ mergelist=["dev-libs/C-1", "dev-libs/B-1", "dev-libs/A-1"],
+ use_changes={"dev-libs/C-1": {"foo": True}},
+ ),
+ ResolverPlaygroundTestCase(
+ ["dev-libs/A"],
+ options={"--emptytree": True, "--exclude": ["dev-libs/C"]},
+ success=True,
+ mergelist=["dev-libs/B-1", "dev-libs/A-1"],
+ ),
+ ResolverPlaygroundTestCase(
+ ["@world"],
+ options={"--emptytree": True},
+ success=False,
+ mergelist=["dev-libs/Y-1", "dev-libs/X-1"],
+ use_changes={"dev-libs/Y-1": {"baz": False}},
+ ),
+ ResolverPlaygroundTestCase(
+ ["dev-libs/Z"],
+ options={"--emptytree": True},
+ success=False,
+ ),
+ )
+
+ playground = ResolverPlayground(
+ ebuilds=ebuilds,
+ installed=installed,
+ user_config=user_config,
+ world=world,
+ )
+ try:
+ for test_case in test_cases:
+ playground.run_TestCase(test_case)
+ self.assertEqual(test_case.test_success, True, test_case.fail_msg)
+ finally:
+ playground.cleanup()
diff --git a/lib/portage/tests/resolver/test_useflags.py b/lib/portage/tests/resolver/test_useflags.py
index 86684f7f2..142a31c7f 100644
--- a/lib/portage/tests/resolver/test_useflags.py
+++ b/lib/portage/tests/resolver/test_useflags.py
@@ -1,4 +1,4 @@
-# Copyright 2014 Gentoo Foundation
+# Copyright 2014-2024 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import sys
@@ -292,8 +292,8 @@ class UseFlagsTestCase(TestCase):
"--usepkg": True,
},
success=False,
- mergelist=["[binary]dev-libs/A-2", "dev-libs/B-1"],
- slot_collision_solutions=[],
+ mergelist=None,
+ slot_collision_solutions=None,
),
)
diff --git a/lib/portage/tests/util/test_socks5.py b/lib/portage/tests/util/test_socks5.py
index 987b41af2..4a6d08169 100644
--- a/lib/portage/tests/util/test_socks5.py
+++ b/lib/portage/tests/util/test_socks5.py
@@ -1,6 +1,7 @@
-# Copyright 2019-2021 Gentoo Authors
+# Copyright 2019-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
+import asyncio
import functools
import shutil
import socket
@@ -10,8 +11,9 @@ import time
import portage
from portage.tests import TestCase
-from portage.util._eventloop.global_event_loop import global_event_loop
from portage.util import socks5
+from portage.util.futures.executor.fork import ForkExecutor
+from portage.util._eventloop.global_event_loop import global_event_loop
from portage.const import PORTAGE_BIN_PATH
from http.server import BaseHTTPRequestHandler, HTTPServer
@@ -88,18 +90,20 @@ class AsyncHTTPServerTestCase(TestCase):
if f is not None:
f.close()
- def test_http_server(self):
+ async def _test_http_server(self):
+ asyncio.run(self._test_http_server())
+
+ async def _test_http_server(self):
host = "127.0.0.1"
content = b"Hello World!\n"
path = "/index.html"
- loop = global_event_loop()
+
+ loop = asyncio.get_running_loop()
for i in range(2):
with AsyncHTTPServer(host, {path: content}, loop) as server:
for j in range(2):
- result = loop.run_until_complete(
- loop.run_in_executor(
- None, self._fetch_directly, host, server.server_port, path
- )
+ result = await loop.run_in_executor(
+ None, self._fetch_directly, host, server.server_port, path
)
self.assertEqual(result, content)
@@ -177,15 +181,20 @@ class Socks5ServerTestCase(TestCase):
return f.read()
def test_socks5_proxy(self):
- loop = global_event_loop()
+ asyncio.run(self._test_socks5_proxy())
+
+ async def _test_socks5_proxy(self):
+ loop = asyncio.get_running_loop()
host = "127.0.0.1"
content = b"Hello World!"
path = "/index.html"
proxy = None
tempdir = tempfile.mkdtemp()
+ previous_exithandlers = portage.process._exithandlers
try:
+ portage.process._exithandlers = []
with AsyncHTTPServer(host, {path: content}, loop) as server:
settings = {
"PORTAGE_TMPDIR": tempdir,
@@ -193,20 +202,63 @@ class Socks5ServerTestCase(TestCase):
}
proxy = socks5.get_socks5_proxy(settings)
- loop.run_until_complete(socks5.proxy.ready())
-
- result = loop.run_until_complete(
- loop.run_in_executor(
- None,
- self._fetch_via_proxy,
- proxy,
- host,
- server.server_port,
- path,
- )
+ await socks5.proxy.ready()
+
+ result = await loop.run_in_executor(
+ None,
+ self._fetch_via_proxy,
+ proxy,
+ host,
+ server.server_port,
+ path,
)
self.assertEqual(result, content)
finally:
- socks5.proxy.stop()
+ try:
+ # Also run_exitfuncs to test atexit hook cleanup.
+ await socks5.proxy.stop()
+ self.assertNotEqual(portage.process._exithandlers, [])
+ portage.process.run_exitfuncs()
+ self.assertEqual(portage.process._exithandlers, [])
+ finally:
+ portage.process._exithandlers = previous_exithandlers
+ shutil.rmtree(tempdir)
+
+
+class Socks5ServerLoopCloseTestCase(TestCase):
+ """
+ For bug 925240, test that the socks5 proxy is automatically
+ terminated when the main event loop is closed, using a subprocess
+ for isolation.
+ """
+
+ def testSocks5ServerLoopClose(self):
+ asyncio.run(self._testSocks5ServerLoopClose())
+
+ async def _testSocks5ServerLoopClose(self):
+ loop = asyncio.get_running_loop()
+ self.assertEqual(
+ await loop.run_in_executor(
+ ForkExecutor(loop=loop), self._testSocks5ServerLoopCloseSubprocess
+ ),
+ True,
+ )
+
+ @staticmethod
+ def _testSocks5ServerLoopCloseSubprocess():
+ loop = global_event_loop()
+ tempdir = tempfile.mkdtemp()
+ try:
+ settings = {
+ "PORTAGE_TMPDIR": tempdir,
+ "PORTAGE_BIN_PATH": PORTAGE_BIN_PATH,
+ }
+
+ socks5.get_socks5_proxy(settings)
+ loop.run_until_complete(socks5.proxy.ready())
+ finally:
+ loop.close()
shutil.rmtree(tempdir)
+
+ return not socks5.proxy.is_running()
diff --git a/lib/portage/util/_async/SchedulerInterface.py b/lib/portage/util/_async/SchedulerInterface.py
index 43a42adff..485958491 100644
--- a/lib/portage/util/_async/SchedulerInterface.py
+++ b/lib/portage/util/_async/SchedulerInterface.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2021 Gentoo Authors
+# Copyright 2012-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import gzip
@@ -49,6 +49,13 @@ class SchedulerInterface(SlotObject):
for k in self._event_loop_attrs:
setattr(self, k, getattr(event_loop, k))
+ @property
+ def _loop(self):
+ """
+ Returns the real underlying asyncio loop.
+ """
+ return self._event_loop._loop
+
@staticmethod
def _return_false():
return False
diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py
index b9e96bb20..ee9e4c60e 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -1,8 +1,9 @@
-# Copyright 2018-2023 Gentoo Authors
+# Copyright 2018-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import os
import signal
+import threading
import asyncio as _real_asyncio
from asyncio.events import AbstractEventLoop as _AbstractEventLoop
@@ -14,6 +15,7 @@ except ImportError:
PidfdChildWatcher = None
import portage
+from portage.util import socks5
class AsyncioEventLoop(_AbstractEventLoop):
@@ -25,18 +27,14 @@ class AsyncioEventLoop(_AbstractEventLoop):
def __init__(self, loop=None):
loop = loop or _real_asyncio.get_event_loop()
self._loop = loop
- self.run_until_complete = (
- self._run_until_complete
- if portage._internal_caller
- else loop.run_until_complete
- )
+ self.run_until_complete = self._run_until_complete
self.call_soon = loop.call_soon
self.call_soon_threadsafe = loop.call_soon_threadsafe
self.call_later = loop.call_later
self.call_at = loop.call_at
self.is_running = loop.is_running
self.is_closed = loop.is_closed
- self.close = loop.close
+ self.close = self._close
self.create_future = (
loop.create_future
if hasattr(loop, "create_future")
@@ -55,10 +53,36 @@ class AsyncioEventLoop(_AbstractEventLoop):
self.get_debug = loop.get_debug
self._wakeup_fd = -1
self._child_watcher = None
+ # Used to drop recursive calls to _close.
+ self._closing = False
+ # Initialized in _run_until_complete.
+ self._is_main = None
if portage._internal_caller:
loop.set_exception_handler(self._internal_caller_exception_handler)
+ def _close(self):
+ """
+ Before closing the main loop, run portage.process.run_exitfuncs()
+ with the event loop running so that anything attached can clean
+ itself up (like the socks5 ProxyManager for bug 925240).
+ """
+ if not (self._closing or self.is_closed()):
+ self._closing = True
+ if self._is_main:
+ self.run_until_complete(self._close_main())
+ self._loop.close()
+ self._closing = False
+
+ async def _close_main(self):
+ # Even though this has an exit hook, invoke it here so that
+ # we can properly wait for it and avoid messages like this:
+ # [ERROR] Task was destroyed but it is pending!
+ if socks5.proxy.is_running():
+ await socks5.proxy.stop()
+
+ portage.process.run_exitfuncs()
+
@staticmethod
def _internal_caller_exception_handler(loop, context):
"""
@@ -139,6 +163,12 @@ class AsyncioEventLoop(_AbstractEventLoop):
In order to avoid potential interference with API consumers, this
implementation is only used when portage._internal_caller is True.
"""
+ if self._is_main is None:
+ self._is_main = threading.current_thread() is threading.main_thread()
+
+ if not portage._internal_caller:
+ return self._loop.run_until_complete(future)
+
if self._wakeup_fd != -1:
signal.set_wakeup_fd(self._wakeup_fd)
self._wakeup_fd = -1
diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py
index 22241f335..4eecc46a8 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -325,13 +325,37 @@ def _safe_loop():
def _get_running_loop():
+ """
+ This calls the real asyncio get_running_loop() and wraps that with
+ portage's internal AsyncioEventLoop wrapper. If there is no running
+ asyncio event loop but portage has a reference to another running
+ loop in this thread, then use that instead.
+
+ This behavior enables portage internals to use the real asyncio.run
+ while remaining compatible with internal code that does not use the
+ real asyncio.run.
+ """
+ try:
+ _loop = _real_asyncio.get_running_loop()
+ except RuntimeError:
+ _loop = None
+
with _thread_weakrefs.lock:
if _thread_weakrefs.pid == portage.getpid():
try:
loop = _thread_weakrefs.loops[threading.get_ident()]
except KeyError:
- return None
- return loop if loop.is_running() else None
+ pass
+ else:
+ if _loop is loop._loop:
+ return loop
+ elif _loop is None:
+ return loop if loop.is_running() else None
+
+ # If _loop it not None here it means it was probably a temporary
+ # loop created by asyncio.run, so we don't try to cache it, and
+ # just return a temporary wrapper.
+ return None if _loop is None else _AsyncioEventLoop(loop=_loop)
def _thread_weakrefs_atexit():
diff --git a/lib/portage/util/socks5.py b/lib/portage/util/socks5.py
index 6c68ff410..f8fcdf9fc 100644
--- a/lib/portage/util/socks5.py
+++ b/lib/portage/util/socks5.py
@@ -2,15 +2,23 @@
# Copyright 2015-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
+import asyncio
import errno
import os
import socket
+from typing import Union
+
+import portage
+
+portage.proxy.lazyimport.lazyimport(
+ globals(),
+ "portage.util._eventloop.global_event_loop:global_event_loop",
+)
import portage.data
from portage import _python_interpreter
from portage.data import portage_gid, portage_uid, userpriv_groups
from portage.process import atexit_register, spawn
-from portage.util.futures import asyncio
class ProxyManager:
@@ -57,23 +65,41 @@ class ProxyManager:
**spawn_kwargs,
)
- def stop(self):
+ def stop(self) -> Union[None, asyncio.Future]:
"""
Stop the SOCKSv5 server.
+
+ If there is a running asyncio event loop then asyncio.Future is
+ returned which should be used to wait for the server process
+ to exit.
"""
+ future = None
+ try:
+ loop = asyncio.get_running_loop()
+ except RuntimeError:
+ loop = None
if self._proc is not None:
self._proc.terminate()
- loop = asyncio.get_event_loop()
- if self._proc_waiter is None:
- self._proc_waiter = asyncio.ensure_future(self._proc.wait(), loop)
- if loop.is_running():
- self._proc_waiter.add_done_callback(lambda future: future.result())
+ if loop is None:
+ # In this case spawn internals would have used
+ # portage's global loop when attaching a waiter to
+ # self._proc, so we are obligated to use that.
+ global_event_loop().run_until_complete(self._proc.wait())
else:
- loop.run_until_complete(self._proc_waiter)
+ if self._proc_waiter is None:
+ self._proc_waiter = asyncio.ensure_future(
+ self._proc.wait(), loop=loop
+ )
+ future = asyncio.shield(self._proc_waiter)
+
+ if loop is not None and future is None:
+ future = loop.create_future()
+ future.set_result(None)
self.socket_path = None
self._proc = None
self._proc_waiter = None
+ return future
def is_running(self):
"""