| # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 |
| # For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt |
| |
| """Code parsing for coverage.py.""" |
| |
| from __future__ import annotations |
| |
| import ast |
| import collections |
| import functools |
| import os |
| import re |
| import token |
| import tokenize |
| from collections.abc import Iterable, Sequence |
| from dataclasses import dataclass |
| from types import CodeType |
| from typing import Callable, Optional, Protocol, cast |
| |
| from coverage import env |
| from coverage.bytecode import code_objects |
| from coverage.debug import short_stack |
| from coverage.exceptions import NoSource, NotPython |
| from coverage.misc import isolate_module, nice_pair |
| from coverage.phystokens import generate_tokens |
| from coverage.types import TArc, TLineNo |
| |
| os = isolate_module(os) |
| |
| |
| class PythonParser: |
| """Parse code to find executable lines, excluded lines, etc. |
| |
| This information is all based on static analysis: no code execution is |
| involved. |
| |
| """ |
| |
| def __init__( |
| self, |
| text: str | None = None, |
| filename: str | None = None, |
| exclude: str | None = None, |
| ) -> None: |
| """ |
| Source can be provided as `text`, the text itself, or `filename`, from |
| which the text will be read. Excluded lines are those that match |
| `exclude`, a regex string. |
| |
| """ |
| assert text or filename, "PythonParser needs either text or filename" |
| self.filename = filename or "<code>" |
| if text is not None: |
| self.text: str = text |
| else: |
| from coverage.python import get_python_source |
| |
| try: |
| self.text = get_python_source(self.filename) |
| except OSError as err: |
| raise NoSource(f"No source for code: '{self.filename}': {err}") from err |
| |
| self.exclude = exclude |
| |
| # The parsed AST of the text. |
| self._ast_root: ast.AST | None = None |
| |
| # The normalized line numbers of the statements in the code. Exclusions |
| # are taken into account, and statements are adjusted to their first |
| # lines. |
| self.statements: set[TLineNo] = set() |
| |
| # The normalized line numbers of the excluded lines in the code, |
| # adjusted to their first lines. |
| self.excluded: set[TLineNo] = set() |
| |
| # The raw_* attributes are only used in this class, and in |
| # lab/parser.py to show how this class is working. |
| |
| # The line numbers that start statements, as reported by the line |
| # number table in the bytecode. |
| self.raw_statements: set[TLineNo] = set() |
| |
| # The raw line numbers of excluded lines of code, as marked by pragmas. |
| self.raw_excluded: set[TLineNo] = set() |
| |
| # The line numbers of docstring lines. |
| self.raw_docstrings: set[TLineNo] = set() |
| |
| # Internal detail, used by lab/parser.py. |
| self.show_tokens = False |
| |
| # A dict mapping line numbers to lexical statement starts for |
| # multi-line statements. |
| self.multiline_map: dict[TLineNo, TLineNo] = {} |
| |
| # Lazily-created arc data, and missing arc descriptions. |
| self._all_arcs: set[TArc] | None = None |
| self._missing_arc_fragments: TArcFragments | None = None |
| self._with_jump_fixers: dict[TArc, tuple[TArc, TArc]] = {} |
| |
| def lines_matching(self, regex: str) -> set[TLineNo]: |
| """Find the lines matching a regex. |
| |
| Returns a set of line numbers, the lines that contain a match for |
| `regex`. The entire line needn't match, just a part of it. |
| Handles multiline regex patterns. |
| |
| """ |
| matches: set[TLineNo] = set() |
| |
| last_start = 0 |
| last_start_line = 0 |
| for match in re.finditer(regex, self.text, flags=re.MULTILINE): |
| start, end = match.span() |
| start_line = last_start_line + self.text.count("\n", last_start, start) |
| end_line = last_start_line + self.text.count("\n", last_start, end) |
| matches.update( |
| self.multiline_map.get(i, i) for i in range(start_line + 1, end_line + 2) |
| ) |
| last_start = start |
| last_start_line = start_line |
| return matches |
| |
| def _raw_parse(self) -> None: |
| """Parse the source to find the interesting facts about its lines. |
| |
| A handful of attributes are updated. |
| |
| """ |
| # Find lines which match an exclusion pattern. |
| if self.exclude: |
| self.raw_excluded = self.lines_matching(self.exclude) |
| self.excluded = set(self.raw_excluded) |
| |
| # The current number of indents. |
| indent: int = 0 |
| # An exclusion comment will exclude an entire clause at this indent. |
| exclude_indent: int = 0 |
| # Are we currently excluding lines? |
| excluding: bool = False |
| # The line number of the first line in a multi-line statement. |
| first_line: int = 0 |
| # Is the file empty? |
| empty: bool = True |
| # Parenthesis (and bracket) nesting level. |
| nesting: int = 0 |
| |
| assert self.text is not None |
| tokgen = generate_tokens(self.text) |
| for toktype, ttext, (slineno, _), (elineno, _), ltext in tokgen: |
| if self.show_tokens: # pragma: debugging |
| print( |
| "%10s %5s %-20r %r" |
| % ( |
| tokenize.tok_name.get(toktype, toktype), |
| nice_pair((slineno, elineno)), |
| ttext, |
| ltext, |
| ) |
| ) |
| if toktype == token.INDENT: |
| indent += 1 |
| elif toktype == token.DEDENT: |
| indent -= 1 |
| elif toktype == token.OP: |
| if ttext == ":" and nesting == 0: |
| should_exclude = self.excluded.intersection(range(first_line, elineno + 1)) |
| if not excluding and should_exclude: |
| # Start excluding a suite. We trigger off of the colon |
| # token so that the #pragma comment will be recognized on |
| # the same line as the colon. |
| self.excluded.add(elineno) |
| exclude_indent = indent |
| excluding = True |
| elif ttext in "([{": |
| nesting += 1 |
| elif ttext in ")]}": |
| nesting -= 1 |
| elif toktype == token.NEWLINE: |
| if first_line and elineno != first_line: |
| # We're at the end of a line, and we've ended on a |
| # different line than the first line of the statement, |
| # so record a multi-line range. |
| for l in range(first_line, elineno + 1): |
| self.multiline_map[l] = first_line |
| first_line = 0 |
| |
| if ttext.strip() and toktype != tokenize.COMMENT: |
| # A non-white-space token. |
| empty = False |
| if not first_line: |
| # The token is not white space, and is the first in a statement. |
| first_line = slineno |
| # Check whether to end an excluded suite. |
| if excluding and indent <= exclude_indent: |
| excluding = False |
| if excluding: |
| self.excluded.add(elineno) |
| |
| # Find the starts of the executable statements. |
| if not empty: |
| byte_parser = ByteParser(self.text, filename=self.filename) |
| self.raw_statements.update(byte_parser._find_statements()) |
| |
| self.excluded = self.first_lines(self.excluded) |
| |
| # AST lets us find classes, docstrings, and decorator-affected |
| # functions and classes. |
| assert self._ast_root is not None |
| for node in ast.walk(self._ast_root): |
| # Find docstrings. |
| if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef, ast.Module)): |
| if node.body: |
| first = node.body[0] |
| if ( |
| isinstance(first, ast.Expr) |
| and isinstance(first.value, ast.Constant) |
| and isinstance(first.value.value, str) |
| ): |
| self.raw_docstrings.update( |
| range(first.lineno, cast(int, first.end_lineno) + 1) |
| ) |
| # Exclusions carry from decorators and signatures to the bodies of |
| # functions and classes. |
| if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)): |
| first_line = min((d.lineno for d in node.decorator_list), default=node.lineno) |
| if self.excluded.intersection(range(first_line, node.lineno + 1)): |
| self.excluded.update(range(first_line, cast(int, node.end_lineno) + 1)) |
| |
| @functools.lru_cache(maxsize=1000) |
| def first_line(self, lineno: TLineNo) -> TLineNo: |
| """Return the first line number of the statement including `lineno`.""" |
| if lineno < 0: |
| lineno = -self.multiline_map.get(-lineno, -lineno) |
| else: |
| lineno = self.multiline_map.get(lineno, lineno) |
| return lineno |
| |
| def first_lines(self, linenos: Iterable[TLineNo]) -> set[TLineNo]: |
| """Map the line numbers in `linenos` to the correct first line of the |
| statement. |
| |
| Returns a set of the first lines. |
| |
| """ |
| return {self.first_line(l) for l in linenos} |
| |
| def translate_lines(self, lines: Iterable[TLineNo]) -> set[TLineNo]: |
| """Implement `FileReporter.translate_lines`.""" |
| return self.first_lines(lines) |
| |
| def translate_arcs(self, arcs: Iterable[TArc]) -> set[TArc]: |
| """Implement `FileReporter.translate_arcs`.""" |
| return {(self.first_line(a), self.first_line(b)) for (a, b) in self.fix_with_jumps(arcs)} |
| |
| def parse_source(self) -> None: |
| """Parse source text to find executable lines, excluded lines, etc. |
| |
| Sets the .excluded and .statements attributes, normalized to the first |
| line of multi-line statements. |
| |
| """ |
| try: |
| self._ast_root = ast.parse(self.text) |
| self._raw_parse() |
| except (tokenize.TokenError, IndentationError, SyntaxError) as err: |
| if hasattr(err, "lineno"): |
| lineno = err.lineno # IndentationError |
| else: |
| lineno = err.args[1][0] # TokenError |
| raise NotPython( |
| f"Couldn't parse '{self.filename}' as Python source: " |
| + f"{err.args[0]!r} at line {lineno}", |
| ) from err |
| |
| ignore = self.excluded | self.raw_docstrings |
| starts = self.raw_statements - ignore |
| self.statements = self.first_lines(starts) - ignore |
| |
| def arcs(self) -> set[TArc]: |
| """Get information about the arcs available in the code. |
| |
| Returns a set of line number pairs. Line numbers have been normalized |
| to the first line of multi-line statements. |
| |
| """ |
| if self._all_arcs is None: |
| self._analyze_ast() |
| assert self._all_arcs is not None |
| return self._all_arcs |
| |
| def _analyze_ast(self) -> None: |
| """Run the AstArcAnalyzer and save its results. |
| |
| `_all_arcs` is the set of arcs in the code. |
| |
| """ |
| assert self._ast_root is not None |
| aaa = AstArcAnalyzer(self.filename, self._ast_root, self.raw_statements, self.multiline_map) |
| aaa.analyze() |
| arcs = aaa.arcs |
| self._with_jump_fixers = aaa.with_jump_fixers() |
| if self._with_jump_fixers: |
| arcs = self.fix_with_jumps(arcs) |
| |
| self._all_arcs = set() |
| for l1, l2 in arcs: |
| fl1 = self.first_line(l1) |
| fl2 = self.first_line(l2) |
| if fl1 != fl2: |
| self._all_arcs.add((fl1, fl2)) |
| |
| self._missing_arc_fragments = aaa.missing_arc_fragments |
| |
| def fix_with_jumps(self, arcs: Iterable[TArc]) -> set[TArc]: |
| """Adjust arcs to fix jumps leaving `with` statements. |
| |
| Consider this code: |
| |
| with open("/tmp/test", "w") as f1: |
| a = 2 |
| b = 3 |
| print(4) |
| |
| In 3.10+, we get traces for lines 1, 2, 3, 1, 4. But we want to present |
| it to the user as if it had been 1, 2, 3, 4. The arc 3->1 should be |
| replaced with 3->4, and 1->4 should be removed. |
| |
| For this code, the fixers dict is {(3, 1): ((1, 4), (3, 4))}. The key |
| is the actual measured arc from the end of the with block back to the |
| start of the with-statement. The values are start_next (the with |
| statement to the next statement after the with), and end_next (the end |
| of the with-statement to the next statement after the with). |
| |
| With nested with-statements, we have to trace through a few levels to |
| correct a longer chain of arcs. |
| |
| """ |
| to_remove = set() |
| to_add = set() |
| for arc in arcs: |
| if arc in self._with_jump_fixers: |
| end0 = arc[0] |
| to_remove.add(arc) |
| start_next, end_next = self._with_jump_fixers[arc] |
| while start_next in self._with_jump_fixers: |
| to_remove.add(start_next) |
| start_next, end_next = self._with_jump_fixers[start_next] |
| to_remove.add(end_next) |
| to_add.add((end0, end_next[1])) |
| to_remove.add(start_next) |
| arcs = (set(arcs) | to_add) - to_remove |
| return arcs |
| |
| @functools.lru_cache |
| def exit_counts(self) -> dict[TLineNo, int]: |
| """Get a count of exits from that each line. |
| |
| Excluded lines are excluded. |
| |
| """ |
| exit_counts: dict[TLineNo, int] = collections.defaultdict(int) |
| for l1, l2 in self.arcs(): |
| assert l1 > 0, f"{l1=} should be greater than zero in {self.filename}" |
| if l1 in self.excluded: |
| # Don't report excluded lines as line numbers. |
| continue |
| if l2 in self.excluded: |
| # Arcs to excluded lines shouldn't count. |
| continue |
| exit_counts[l1] += 1 |
| |
| return exit_counts |
| |
| def _finish_action_msg(self, action_msg: str | None, end: TLineNo) -> str: |
| """Apply some defaulting and formatting to an arc's description.""" |
| if action_msg is None: |
| if end < 0: |
| action_msg = "jump to the function exit" |
| else: |
| action_msg = "jump to line {lineno}" |
| action_msg = action_msg.format(lineno=end) |
| return action_msg |
| |
| def missing_arc_description(self, start: TLineNo, end: TLineNo) -> str: |
| """Provide an English sentence describing a missing arc.""" |
| if self._missing_arc_fragments is None: |
| self._analyze_ast() |
| assert self._missing_arc_fragments is not None |
| |
| fragment_pairs = self._missing_arc_fragments.get((start, end), [(None, None)]) |
| |
| msgs = [] |
| for missing_cause_msg, action_msg in fragment_pairs: |
| action_msg = self._finish_action_msg(action_msg, end) |
| msg = f"line {start} didn't {action_msg}" |
| if missing_cause_msg is not None: |
| msg += f" because {missing_cause_msg.format(lineno=start)}" |
| |
| msgs.append(msg) |
| |
| return " or ".join(msgs) |
| |
| def arc_description(self, start: TLineNo, end: TLineNo) -> str: |
| """Provide an English description of an arc's effect.""" |
| if self._missing_arc_fragments is None: |
| self._analyze_ast() |
| assert self._missing_arc_fragments is not None |
| |
| fragment_pairs = self._missing_arc_fragments.get((start, end), [(None, None)]) |
| action_msg = self._finish_action_msg(fragment_pairs[0][1], end) |
| return action_msg |
| |
| |
| class ByteParser: |
| """Parse bytecode to understand the structure of code.""" |
| |
| def __init__( |
| self, |
| text: str, |
| code: CodeType | None = None, |
| filename: str | None = None, |
| ) -> None: |
| self.text = text |
| if code is not None: |
| self.code = code |
| else: |
| assert filename is not None |
| # We only get here if earlier ast parsing succeeded, so no need to |
| # catch errors. |
| self.code = compile(text, filename, "exec", dont_inherit=True) |
| |
| def child_parsers(self) -> Iterable[ByteParser]: |
| """Iterate over all the code objects nested within this one. |
| |
| The iteration includes `self` as its first value. |
| |
| We skip code objects named `__annotate__` since they are deferred |
| annotations that usually are never run. If there are errors in the |
| annotations, they will be caught by type checkers or other tools that |
| use annotations. |
| |
| """ |
| return ( |
| ByteParser(self.text, code=c) |
| for c in code_objects(self.code) |
| if c.co_name != "__annotate__" |
| ) |
| |
| def _line_numbers(self) -> Iterable[TLineNo]: |
| """Yield the line numbers possible in this code object. |
| |
| Uses co_lines() to produce a sequence: l0, l1, ... |
| """ |
| for _, _, line in self.code.co_lines(): |
| if line: |
| yield line |
| |
| def _find_statements(self) -> Iterable[TLineNo]: |
| """Find the statements in `self.code`. |
| |
| Produce a sequence of line numbers that start statements. Recurses |
| into all code objects reachable from `self.code`. |
| |
| """ |
| for bp in self.child_parsers(): |
| # Get all of the lineno information from this code. |
| yield from bp._line_numbers() |
| |
| |
| # |
| # AST analysis |
| # |
| |
| |
| @dataclass(frozen=True, order=True) |
| class ArcStart: |
| """The information needed to start an arc. |
| |
| `lineno` is the line number the arc starts from. |
| |
| `cause` is an English text fragment used as the `missing_cause_msg` for |
| AstArcAnalyzer.missing_arc_fragments. It will be used to describe why an |
| arc wasn't executed, so should fit well into a sentence of the form, |
| "Line 17 didn't run because {cause}." The fragment can include "{lineno}" |
| to have `lineno` interpolated into it. |
| |
| As an example, this code:: |
| |
| if something(x): # line 1 |
| func(x) # line 2 |
| more_stuff() # line 3 |
| |
| would have two ArcStarts: |
| |
| - ArcStart(1, "the condition on line 1 was always true") |
| - ArcStart(1, "the condition on line 1 was never true") |
| |
| The first would be used to create an arc from 1 to 3, creating a message like |
| "line 1 didn't jump to line 3 because the condition on line 1 was always true." |
| |
| The second would be used for the arc from 1 to 2, creating a message like |
| "line 1 didn't jump to line 2 because the condition on line 1 was never true." |
| |
| """ |
| |
| lineno: TLineNo |
| cause: str = "" |
| |
| |
| class TAddArcFn(Protocol): |
| """The type for AstArcAnalyzer.add_arc().""" |
| |
| def __call__( |
| self, |
| start: TLineNo, |
| end: TLineNo, |
| missing_cause_msg: str | None = None, |
| action_msg: str | None = None, |
| ) -> None: |
| """ |
| Record an arc from `start` to `end`. |
| |
| `missing_cause_msg` is a description of the reason the arc wasn't |
| taken if it wasn't taken. For example, "the condition on line 10 was |
| never true." |
| |
| `action_msg` is a description of what the arc does, like "jump to line |
| 10" or "exit from function 'fooey'." |
| |
| """ |
| |
| |
| TArcFragments = dict[TArc, list[tuple[Optional[str], Optional[str]]]] |
| |
| |
| class Block: |
| """ |
| Blocks need to handle various exiting statements in their own ways. |
| |
| All of these methods take a list of exits, and a callable `add_arc` |
| function that they can use to add arcs if needed. They return True if the |
| exits are handled, or False if the search should continue up the block |
| stack. |
| """ |
| |
| # pylint: disable=unused-argument |
| def process_break_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool: |
| """Process break exits.""" |
| return False |
| |
| def process_continue_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool: |
| """Process continue exits.""" |
| return False |
| |
| def process_raise_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool: |
| """Process raise exits.""" |
| return False |
| |
| def process_return_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool: |
| """Process return exits.""" |
| return False |
| |
| |
| class LoopBlock(Block): |
| """A block on the block stack representing a `for` or `while` loop.""" |
| |
| def __init__(self, start: TLineNo) -> None: |
| # The line number where the loop starts. |
| self.start = start |
| # A set of ArcStarts, the arcs from break statements exiting this loop. |
| self.break_exits: set[ArcStart] = set() |
| |
| def process_break_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool: |
| self.break_exits.update(exits) |
| return True |
| |
| def process_continue_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool: |
| for xit in exits: |
| add_arc(xit.lineno, self.start, xit.cause) |
| return True |
| |
| |
| class FunctionBlock(Block): |
| """A block on the block stack representing a function definition.""" |
| |
| def __init__(self, start: TLineNo, name: str) -> None: |
| # The line number where the function starts. |
| self.start = start |
| # The name of the function. |
| self.name = name |
| |
| def process_raise_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool: |
| for xit in exits: |
| add_arc( |
| xit.lineno, |
| -self.start, |
| xit.cause, |
| f"except from function {self.name!r}", |
| ) |
| return True |
| |
| def process_return_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool: |
| for xit in exits: |
| add_arc( |
| xit.lineno, |
| -self.start, |
| xit.cause, |
| f"return from function {self.name!r}", |
| ) |
| return True |
| |
| |
| class TryBlock(Block): |
| """A block on the block stack representing a `try` block.""" |
| |
| def __init__(self, handler_start: TLineNo | None, final_start: TLineNo | None) -> None: |
| # The line number of the first "except" handler, if any. |
| self.handler_start = handler_start |
| # The line number of the "finally:" clause, if any. |
| self.final_start = final_start |
| |
| def process_raise_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool: |
| if self.handler_start is not None: |
| for xit in exits: |
| add_arc(xit.lineno, self.handler_start, xit.cause) |
| return True |
| |
| |
| # TODO: Shouldn't the cause messages join with "and" instead of "or"? |
| |
| |
| def is_constant_test_expr(node: ast.AST) -> tuple[bool, bool]: |
| """Is this a compile-time constant test expression? |
| |
| We don't try to mimic all of CPython's optimizations. We just have to |
| handle the kinds of constant expressions people might actually use. |
| |
| """ |
| match node: |
| case ast.Constant(): |
| return True, bool(node.value) |
| case ast.Name(): |
| if node.id in ["True", "False", "None", "__debug__"]: |
| return True, eval(node.id) # pylint: disable=eval-used |
| case ast.UnaryOp(): |
| if isinstance(node.op, ast.Not): |
| is_constant, val = is_constant_test_expr(node.operand) |
| return is_constant, not val |
| case ast.BoolOp(): |
| rets = [is_constant_test_expr(v) for v in node.values] |
| is_constant = all(is_const for is_const, _ in rets) |
| if is_constant: |
| op = any if isinstance(node.op, ast.Or) else all |
| return True, op(v for _, v in rets) |
| return False, False |
| |
| |
| class AstArcAnalyzer: |
| """Analyze source text with an AST to find executable code paths. |
| |
| The .analyze() method does the work, and populates these attributes: |
| |
| `arcs`: a set of (from, to) pairs of the the arcs possible in the code. |
| |
| `missing_arc_fragments`: a dict mapping (from, to) arcs to lists of |
| message fragments explaining why the arc is missing from execution:: |
| |
| { (start, end): [(missing_cause_msg, action_msg), ...], } |
| |
| For an arc starting from line 17, they should be usable to form complete |
| sentences like: "Line 17 didn't {action_msg} because {missing_cause_msg}". |
| |
| NOTE: Starting in July 2024, I've been whittling this down to only report |
| arc that are part of true branches. It's not clear how far this work will |
| go. |
| |
| """ |
| |
| def __init__( |
| self, |
| filename: str, |
| root_node: ast.AST, |
| statements: set[TLineNo], |
| multiline: dict[TLineNo, TLineNo], |
| ) -> None: |
| self.filename = filename |
| self.root_node = root_node |
| self.statements = {multiline.get(l, l) for l in statements} |
| self.multiline = multiline |
| |
| # Turn on AST dumps with an environment variable. |
| # $set_env.py: COVERAGE_AST_DUMP - Dump the AST nodes when parsing code. |
| dump_ast = bool(int(os.getenv("COVERAGE_AST_DUMP", "0"))) |
| |
| if dump_ast: # pragma: debugging |
| # Dump the AST so that failing tests have helpful output. |
| print(f"Statements: {self.statements}") |
| print(f"Multiline map: {self.multiline}") |
| print(ast.dump(self.root_node, include_attributes=True, indent=4)) |
| |
| self.arcs: set[TArc] = set() |
| self.missing_arc_fragments: TArcFragments = collections.defaultdict(list) |
| self.block_stack: list[Block] = [] |
| |
| # If `with` clauses jump to their start on the way out, we need |
| # information to be able to skip over that jump. We record the arcs |
| # from `with` into the clause (with_entries), and the arcs from the |
| # clause to the `with` (with_exits). |
| self.current_with_starts: set[TLineNo] = set() |
| self.all_with_starts: set[TLineNo] = set() |
| self.with_entries: set[TArc] = set() |
| self.with_exits: set[TArc] = set() |
| |
| # $set_env.py: COVERAGE_TRACK_ARCS - Trace possible arcs added while parsing code. |
| self.debug = bool(int(os.getenv("COVERAGE_TRACK_ARCS", "0"))) |
| |
| def analyze(self) -> None: |
| """Examine the AST tree from `self.root_node` to determine possible arcs.""" |
| for node in ast.walk(self.root_node): |
| node_name = node.__class__.__name__ |
| code_object_handler = getattr(self, f"_code_object__{node_name}", None) |
| if code_object_handler is not None: |
| code_object_handler(node) |
| |
| def with_jump_fixers(self) -> dict[TArc, tuple[TArc, TArc]]: |
| """Get a dict with data for fixing jumps out of with statements. |
| |
| Returns a dict. The keys are arcs leaving a with-statement by jumping |
| back to its start. The values are pairs: first, the arc from the start |
| to the next statement, then the arc that exits the with without going |
| to the start. |
| |
| """ |
| fixers = {} |
| with_nexts = { |
| arc |
| for arc in self.arcs |
| if arc[0] in self.all_with_starts and arc not in self.with_entries |
| } |
| for start in self.all_with_starts: |
| nexts = {arc[1] for arc in with_nexts if arc[0] == start} |
| if not nexts: |
| continue |
| assert len(nexts) == 1, f"Expected one arc, got {nexts} with {start = }" |
| nxt = nexts.pop() |
| ends = {arc[0] for arc in self.with_exits if arc[1] == start} |
| for end in ends: |
| fixers[(end, start)] = ((start, nxt), (end, nxt)) |
| return fixers |
| |
| # Code object dispatchers: _code_object__* |
| # |
| # These methods are used by analyze() as the start of the analysis. |
| # There is one for each construct with a code object. |
| |
| def _code_object__Module(self, node: ast.Module) -> None: |
| start = self.line_for_node(node) |
| if node.body: |
| exits = self.process_body(node.body) |
| for xit in exits: |
| self.add_arc(xit.lineno, -start, xit.cause, "exit the module") |
| else: |
| # Empty module. |
| self.add_arc(start, -start) |
| |
| def _code_object__FunctionDef(self, node: ast.FunctionDef) -> None: |
| start = self.line_for_node(node) |
| self.block_stack.append(FunctionBlock(start=start, name=node.name)) |
| exits = self.process_body(node.body) |
| self.process_return_exits(exits) |
| self.block_stack.pop() |
| |
| _code_object__AsyncFunctionDef = _code_object__FunctionDef |
| |
| def _code_object__ClassDef(self, node: ast.ClassDef) -> None: |
| start = self.line_for_node(node) |
| exits = self.process_body(node.body) |
| for xit in exits: |
| self.add_arc(xit.lineno, -start, xit.cause, f"exit class {node.name!r}") |
| |
| def add_arc( |
| self, |
| start: TLineNo, |
| end: TLineNo, |
| missing_cause_msg: str | None = None, |
| action_msg: str | None = None, |
| ) -> None: |
| """Add an arc, including message fragments to use if it is missing.""" |
| if self.debug: # pragma: debugging |
| print(f"Adding possible arc: ({start}, {end}): {missing_cause_msg!r}, {action_msg!r}") |
| print(short_stack(), end="\n\n") |
| self.arcs.add((start, end)) |
| if start in self.current_with_starts: |
| self.with_entries.add((start, end)) |
| |
| if missing_cause_msg is not None or action_msg is not None: |
| self.missing_arc_fragments[(start, end)].append((missing_cause_msg, action_msg)) |
| |
| def nearest_blocks(self) -> Iterable[Block]: |
| """Yield the blocks in nearest-to-farthest order.""" |
| return reversed(self.block_stack) |
| |
| def line_for_node(self, node: ast.AST) -> TLineNo: |
| """What is the right line number to use for this node? |
| |
| This dispatches to _line__Node functions where needed. |
| |
| """ |
| node_name = node.__class__.__name__ |
| handler = cast( |
| Optional[Callable[[ast.AST], TLineNo]], |
| getattr(self, f"_line__{node_name}", None), |
| ) |
| if handler is not None: |
| line = handler(node) |
| else: |
| line = node.lineno # type: ignore[attr-defined] |
| return self.multiline.get(line, line) |
| |
| # First lines: _line__* |
| # |
| # Dispatched by line_for_node, each method knows how to identify the first |
| # line number in the node, as Python will report it. |
| |
| def _line_decorated(self, node: ast.FunctionDef) -> TLineNo: |
| """Compute first line number for things that can be decorated (classes and functions).""" |
| if node.decorator_list: |
| lineno = node.decorator_list[0].lineno |
| else: |
| lineno = node.lineno |
| return lineno |
| |
| def _line__Assign(self, node: ast.Assign) -> TLineNo: |
| return self.line_for_node(node.value) |
| |
| _line__ClassDef = _line_decorated |
| |
| def _line__Dict(self, node: ast.Dict) -> TLineNo: |
| if node.keys: |
| if node.keys[0] is not None: |
| return node.keys[0].lineno |
| else: |
| # Unpacked dict literals `{**{"a":1}}` have None as the key, |
| # use the value in that case. |
| return node.values[0].lineno |
| else: |
| return node.lineno |
| |
| _line__FunctionDef = _line_decorated |
| _line__AsyncFunctionDef = _line_decorated |
| |
| def _line__List(self, node: ast.List) -> TLineNo: |
| if node.elts: |
| return self.line_for_node(node.elts[0]) |
| else: |
| return node.lineno |
| |
| def _line__Module(self, node: ast.Module) -> TLineNo: # pylint: disable=unused-argument |
| return 1 |
| |
| # The node types that just flow to the next node with no complications. |
| OK_TO_DEFAULT = { |
| "AnnAssign", |
| "Assign", |
| "Assert", |
| "AugAssign", |
| "Delete", |
| "Expr", |
| "Global", |
| "Import", |
| "ImportFrom", |
| "Nonlocal", |
| "Pass", |
| } |
| |
| def node_exits(self, node: ast.AST) -> set[ArcStart]: |
| """Find the set of arc starts that exit this node. |
| |
| Return a set of ArcStarts, exits from this node to the next. Because a |
| node represents an entire sub-tree (including its children), the exits |
| from a node can be arbitrarily complex:: |
| |
| if something(1): |
| if other(2): |
| doit(3) |
| else: |
| doit(5) |
| |
| There are three exits from line 1: they start at lines 1, 3 and 5. |
| There are two exits from line 2: lines 3 and 5. |
| |
| """ |
| node_name = node.__class__.__name__ |
| handler = cast( |
| Optional[Callable[[ast.AST], set[ArcStart]]], |
| getattr(self, f"_handle__{node_name}", None), |
| ) |
| if handler is not None: |
| arc_starts = handler(node) |
| else: |
| # No handler: either it's something that's ok to default (a simple |
| # statement), or it's something we overlooked. |
| if env.TESTING: |
| if node_name not in self.OK_TO_DEFAULT: |
| raise RuntimeError(f"*** Unhandled: {node}") # pragma: only failure |
| |
| # Default for simple statements: one exit from this node. |
| arc_starts = {ArcStart(self.line_for_node(node))} |
| return arc_starts |
| |
| def process_body( |
| self, |
| body: Sequence[ast.AST], |
| from_start: ArcStart | None = None, |
| prev_starts: set[ArcStart] | None = None, |
| ) -> set[ArcStart]: |
| """Process the body of a compound statement. |
| |
| `body` is the body node to process. |
| |
| `from_start` is a single `ArcStart` that starts an arc into this body. |
| `prev_starts` is a set of ArcStarts that can all be the start of arcs |
| into this body. Only one of `from_start` and `prev_starts` should be |
| given. |
| |
| Records arcs within the body by calling `self.add_arc`. |
| |
| Returns a set of ArcStarts, the exits from this body. |
| |
| """ |
| if prev_starts is None: |
| if from_start is None: |
| prev_starts = set() |
| else: |
| prev_starts = {from_start} |
| else: |
| assert from_start is None |
| |
| # Loop over the nodes in the body, making arcs from each one's exits to |
| # the next node. |
| for body_node in body: |
| lineno = self.line_for_node(body_node) |
| if lineno not in self.statements: |
| continue |
| for prev_start in prev_starts: |
| self.add_arc(prev_start.lineno, lineno, prev_start.cause) |
| prev_starts = self.node_exits(body_node) |
| return prev_starts |
| |
| # Exit processing: process_*_exits |
| # |
| # These functions process the four kinds of jump exits: break, continue, |
| # raise, and return. To figure out where an exit goes, we have to look at |
| # the block stack context. For example, a break will jump to the nearest |
| # enclosing loop block, or the nearest enclosing finally block, whichever |
| # is nearer. |
| |
| def process_break_exits(self, exits: set[ArcStart]) -> None: |
| """Add arcs due to jumps from `exits` being breaks.""" |
| for block in self.nearest_blocks(): # pragma: always breaks |
| if block.process_break_exits(exits, self.add_arc): |
| break |
| |
| def process_continue_exits(self, exits: set[ArcStart]) -> None: |
| """Add arcs due to jumps from `exits` being continues.""" |
| for block in self.nearest_blocks(): # pragma: always breaks |
| if block.process_continue_exits(exits, self.add_arc): |
| break |
| |
| def process_raise_exits(self, exits: set[ArcStart]) -> None: |
| """Add arcs due to jumps from `exits` being raises.""" |
| for block in self.nearest_blocks(): |
| if block.process_raise_exits(exits, self.add_arc): |
| break |
| |
| def process_return_exits(self, exits: set[ArcStart]) -> None: |
| """Add arcs due to jumps from `exits` being returns.""" |
| for block in self.nearest_blocks(): # pragma: always breaks |
| if block.process_return_exits(exits, self.add_arc): |
| break |
| |
| # Node handlers: _handle__* |
| # |
| # Each handler deals with a specific AST node type, dispatched from |
| # node_exits. Handlers return the set of exits from that node, and can |
| # also call self.add_arc to record arcs they find. These functions mirror |
| # the Python semantics of each syntactic construct. See the docstring |
| # for node_exits to understand the concept of exits from a node. |
| # |
| # Every node type that represents a statement should have a handler, or it |
| # should be listed in OK_TO_DEFAULT. |
| |
| def _handle__Break(self, node: ast.Break) -> set[ArcStart]: |
| here = self.line_for_node(node) |
| break_start = ArcStart(here, cause="the break on line {lineno} wasn't executed") |
| self.process_break_exits({break_start}) |
| return set() |
| |
| def _handle_decorated(self, node: ast.FunctionDef) -> set[ArcStart]: |
| """Add arcs for things that can be decorated (classes and functions).""" |
| main_line: TLineNo = node.lineno |
| last: TLineNo | None = node.lineno |
| decs = node.decorator_list |
| if decs: |
| last = None |
| for dec_node in decs: |
| dec_start = self.line_for_node(dec_node) |
| if last is not None and dec_start != last: |
| self.add_arc(last, dec_start) |
| last = dec_start |
| assert last is not None |
| self.add_arc(last, main_line) |
| last = main_line |
| # The definition line may have been missed, but we should have it |
| # in `self.statements`. For some constructs, `line_for_node` is |
| # not what we'd think of as the first line in the statement, so map |
| # it to the first one. |
| assert node.body, f"Oops: {node.body = } in {self.filename}@{node.lineno}" |
| # The body is handled in collect_arcs. |
| assert last is not None |
| return {ArcStart(last)} |
| |
| _handle__ClassDef = _handle_decorated |
| |
| def _handle__Continue(self, node: ast.Continue) -> set[ArcStart]: |
| here = self.line_for_node(node) |
| continue_start = ArcStart(here, cause="the continue on line {lineno} wasn't executed") |
| self.process_continue_exits({continue_start}) |
| return set() |
| |
| def _handle__For(self, node: ast.For) -> set[ArcStart]: |
| start = self.line_for_node(node.iter) |
| self.block_stack.append(LoopBlock(start=start)) |
| from_start = ArcStart(start, cause="the loop on line {lineno} never started") |
| exits = self.process_body(node.body, from_start=from_start) |
| # Any exit from the body will go back to the top of the loop. |
| for xit in exits: |
| self.add_arc(xit.lineno, start, xit.cause) |
| my_block = self.block_stack.pop() |
| assert isinstance(my_block, LoopBlock) |
| exits = my_block.break_exits |
| from_start = ArcStart(start, cause="the loop on line {lineno} didn't complete") |
| if node.orelse: |
| else_exits = self.process_body(node.orelse, from_start=from_start) |
| exits |= else_exits |
| else: |
| # No else clause: exit from the for line. |
| exits.add(from_start) |
| return exits |
| |
| _handle__AsyncFor = _handle__For |
| |
| _handle__FunctionDef = _handle_decorated |
| _handle__AsyncFunctionDef = _handle_decorated |
| |
| def _handle__If(self, node: ast.If) -> set[ArcStart]: |
| start = self.line_for_node(node.test) |
| constant_test, val = is_constant_test_expr(node.test) |
| exits = set() |
| if not constant_test or val: |
| from_start = ArcStart(start, cause="the condition on line {lineno} was never true") |
| exits |= self.process_body(node.body, from_start=from_start) |
| if not constant_test or not val: |
| from_start = ArcStart(start, cause="the condition on line {lineno} was always true") |
| exits |= self.process_body(node.orelse, from_start=from_start) |
| return exits |
| |
| def _handle__Match(self, node: ast.Match) -> set[ArcStart]: |
| start = self.line_for_node(node) |
| last_start = start |
| exits = set() |
| for case in node.cases: |
| case_start = self.line_for_node(case.pattern) |
| self.add_arc(last_start, case_start, "the pattern on line {lineno} always matched") |
| from_start = ArcStart( |
| case_start, |
| cause="the pattern on line {lineno} never matched", |
| ) |
| exits |= self.process_body(case.body, from_start=from_start) |
| last_start = case_start |
| |
| # case is now the last case, check for wildcard match. |
| pattern = case.pattern # pylint: disable=undefined-loop-variable |
| while isinstance(pattern, ast.MatchOr): |
| pattern = pattern.patterns[-1] |
| while isinstance(pattern, ast.MatchAs) and pattern.pattern is not None: |
| pattern = pattern.pattern |
| had_wildcard = ( |
| isinstance(pattern, ast.MatchAs) and pattern.pattern is None and case.guard is None # pylint: disable=undefined-loop-variable |
| ) |
| |
| if not had_wildcard: |
| exits.add( |
| ArcStart(case_start, cause="the pattern on line {lineno} always matched"), |
| ) |
| return exits |
| |
| def _handle__Raise(self, node: ast.Raise) -> set[ArcStart]: |
| here = self.line_for_node(node) |
| raise_start = ArcStart(here, cause="the raise on line {lineno} wasn't executed") |
| self.process_raise_exits({raise_start}) |
| # `raise` statement jumps away, no exits from here. |
| return set() |
| |
| def _handle__Return(self, node: ast.Return) -> set[ArcStart]: |
| here = self.line_for_node(node) |
| return_start = ArcStart(here, cause="the return on line {lineno} wasn't executed") |
| self.process_return_exits({return_start}) |
| # `return` statement jumps away, no exits from here. |
| return set() |
| |
| def _handle__Try(self, node: ast.Try) -> set[ArcStart]: |
| if node.handlers: |
| handler_start = self.line_for_node(node.handlers[0]) |
| else: |
| handler_start = None |
| |
| if node.finalbody: |
| final_start = self.line_for_node(node.finalbody[0]) |
| else: |
| final_start = None |
| |
| # This is true by virtue of Python syntax: have to have either except |
| # or finally, or both. |
| assert handler_start is not None or final_start is not None |
| try_block = TryBlock(handler_start, final_start) |
| self.block_stack.append(try_block) |
| |
| start = self.line_for_node(node) |
| exits = self.process_body(node.body, from_start=ArcStart(start)) |
| |
| # We're done with the `try` body, so this block no longer handles |
| # exceptions. We keep the block so the `finally` clause can pick up |
| # flows from the handlers and `else` clause. |
| if node.finalbody: |
| try_block.handler_start = None |
| else: |
| self.block_stack.pop() |
| |
| handler_exits: set[ArcStart] = set() |
| |
| if node.handlers: |
| for handler_node in node.handlers: |
| handler_start = self.line_for_node(handler_node) |
| from_cause = "the exception caught by line {lineno} didn't happen" |
| from_start = ArcStart(handler_start, cause=from_cause) |
| handler_exits |= self.process_body(handler_node.body, from_start=from_start) |
| |
| if node.orelse: |
| exits = self.process_body(node.orelse, prev_starts=exits) |
| |
| exits |= handler_exits |
| |
| if node.finalbody: |
| self.block_stack.pop() |
| final_from = exits |
| |
| final_exits = self.process_body(node.finalbody, prev_starts=final_from) |
| |
| if exits: |
| # The finally clause's exits are only exits for the try block |
| # as a whole if the try block had some exits to begin with. |
| exits = final_exits |
| |
| return exits |
| |
| def _handle__While(self, node: ast.While) -> set[ArcStart]: |
| start = to_top = self.line_for_node(node.test) |
| constant_test, _ = is_constant_test_expr(node.test) |
| self.block_stack.append(LoopBlock(start=to_top)) |
| from_start = ArcStart(start, cause="the condition on line {lineno} was never true") |
| exits = self.process_body(node.body, from_start=from_start) |
| for xit in exits: |
| self.add_arc(xit.lineno, to_top, xit.cause) |
| exits = set() |
| my_block = self.block_stack.pop() |
| assert isinstance(my_block, LoopBlock) |
| exits.update(my_block.break_exits) |
| from_start = ArcStart(start, cause="the condition on line {lineno} was always true") |
| if node.orelse: |
| else_exits = self.process_body(node.orelse, from_start=from_start) |
| exits |= else_exits |
| else: |
| # No `else` clause: you can exit from the start. |
| if not constant_test: |
| exits.add(from_start) |
| return exits |
| |
| def _handle__With(self, node: ast.With) -> set[ArcStart]: |
| if env.PYBEHAVIOR.exit_with_through_ctxmgr: |
| starts = [self.line_for_node(item.context_expr) for item in node.items] |
| else: |
| starts = [self.line_for_node(node)] |
| for start in starts: |
| self.current_with_starts.add(start) |
| self.all_with_starts.add(start) |
| |
| exits = self.process_body(node.body, from_start=ArcStart(starts[-1])) |
| |
| start = starts[-1] |
| self.current_with_starts.remove(start) |
| with_exit = {ArcStart(start)} |
| if exits: |
| for xit in exits: |
| self.add_arc(xit.lineno, start) |
| self.with_exits.add((xit.lineno, start)) |
| exits = with_exit |
| |
| return exits |
| |
| _handle__AsyncWith = _handle__With |