blob: ed01bd50a9ffd712a42cb8cc4278fcb049a88cea [file] [log] [blame] [edit]
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
"""Invasive patches for coverage.py."""
from __future__ import annotations
import atexit
import contextlib
import os
import site
from pathlib import Path
from typing import TYPE_CHECKING, Any, NoReturn
from coverage import env
from coverage.debug import NoDebugging, DevNullDebug
from coverage.exceptions import ConfigError, CoverageException
if TYPE_CHECKING:
from coverage import Coverage
from coverage.config import CoverageConfig
from coverage.types import TDebugCtl
def apply_patches(
cov: Coverage,
config: CoverageConfig,
debug: TDebugCtl,
*,
make_pth_file: bool = True,
) -> None:
"""Apply invasive patches requested by `[run] patch=`."""
debug = debug if debug.should("patch") else DevNullDebug()
for patch in sorted(set(config.patch)):
match patch:
case "_exit":
_patch__exit(cov, debug)
case "execv":
_patch_execv(cov, config, debug)
case "fork":
_patch_fork(debug)
case "subprocess":
_patch_subprocess(config, debug, make_pth_file)
case _:
raise ConfigError(f"Unknown patch {patch!r}")
def _patch__exit(cov: Coverage, debug: TDebugCtl) -> None:
"""Patch os._exit."""
debug.write("Patching _exit")
old_exit = os._exit
def coverage_os_exit_patch(status: int) -> NoReturn:
with contextlib.suppress(Exception):
debug.write(f"Using _exit patch with {cov = }")
with contextlib.suppress(Exception):
cov.save()
old_exit(status)
os._exit = coverage_os_exit_patch
def _patch_execv(cov: Coverage, config: CoverageConfig, debug: TDebugCtl) -> None:
"""Patch the execv family of functions."""
if env.WINDOWS:
raise CoverageException("patch=execv isn't supported yet on Windows.")
debug.write("Patching execv")
def make_execv_patch(fname: str, old_execv: Any) -> Any:
def coverage_execv_patch(*args: Any, **kwargs: Any) -> Any:
with contextlib.suppress(Exception):
debug.write(f"Using execv patch for {fname} with {cov = }")
with contextlib.suppress(Exception):
cov.save()
if fname.endswith("e"):
# Assume the `env` argument is passed positionally.
new_env = args[-1]
# Pass our configuration in the new environment.
new_env["COVERAGE_PROCESS_CONFIG"] = config.serialize()
if env.TESTING:
# The subprocesses need to use the same core as the main process.
new_env["COVERAGE_CORE"] = os.getenv("COVERAGE_CORE")
# When testing locally, we need to honor the pyc file location
# or they get written to the .tox directories and pollute the
# next run with a different core.
if (cache_prefix := os.getenv("PYTHONPYCACHEPREFIX")) is not None:
new_env["PYTHONPYCACHEPREFIX"] = cache_prefix
# Without this, it fails on PyPy and Ubuntu.
new_env["PATH"] = os.getenv("PATH")
old_execv(*args, **kwargs)
return coverage_execv_patch
# All the exec* and spawn* functions eventually call execv or execve.
os.execv = make_execv_patch("execv", os.execv)
os.execve = make_execv_patch("execve", os.execve)
def _patch_fork(debug: TDebugCtl) -> None:
"""Ensure Coverage is properly reset after a fork."""
from coverage.control import _after_fork_in_child
if env.WINDOWS:
raise CoverageException("patch=fork isn't supported yet on Windows.")
debug.write("Patching fork")
os.register_at_fork(after_in_child=_after_fork_in_child)
def _patch_subprocess(config: CoverageConfig, debug: TDebugCtl, make_pth_file: bool) -> None:
"""Write .pth files and set environment vars to measure subprocesses."""
debug.write("Patching subprocess")
if make_pth_file:
pth_files = create_pth_files(debug)
def delete_pth_files() -> None:
for p in pth_files:
debug.write(f"Deleting subprocess .pth file: {str(p)!r}")
p.unlink(missing_ok=True)
atexit.register(delete_pth_files)
assert config.config_file is not None
os.environ["COVERAGE_PROCESS_CONFIG"] = config.serialize()
# Writing .pth files is not obvious. On Windows, getsitepackages() returns two
# directories. A .pth file in the first will be run, but coverage isn't
# importable yet. We write into all the places we can, but with defensive
# import code.
PTH_CODE = """\
try:
import coverage
except:
pass
else:
coverage.process_startup()
"""
PTH_TEXT = f"import sys; exec({PTH_CODE!r})\n"
def create_pth_files(debug: TDebugCtl = NoDebugging()) -> list[Path]:
"""Create .pth files for measuring subprocesses."""
pth_files = []
for pth_dir in site.getsitepackages():
pth_file = Path(pth_dir) / f"subcover_{os.getpid()}.pth"
try:
if debug.should("patch"):
debug.write(f"Writing subprocess .pth file: {str(pth_file)!r}")
pth_file.write_text(PTH_TEXT, encoding="utf-8")
except OSError: # pragma: cant happen
continue
else:
pth_files.append(pth_file)
return pth_files