| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479 | import contextlibimport ioimport osimport shleximport shutilimport sysimport tempfileimport typing as tfrom types import TracebackTypefrom . import formattingfrom . import termuifrom . import utilsfrom ._compat import _find_binary_readerif t.TYPE_CHECKING:    from .core import BaseCommandclass EchoingStdin:    def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None:        self._input = input        self._output = output        self._paused = False    def __getattr__(self, x: str) -> t.Any:        return getattr(self._input, x)    def _echo(self, rv: bytes) -> bytes:        if not self._paused:            self._output.write(rv)        return rv    def read(self, n: int = -1) -> bytes:        return self._echo(self._input.read(n))    def read1(self, n: int = -1) -> bytes:        return self._echo(self._input.read1(n))  # type: ignore    def readline(self, n: int = -1) -> bytes:        return self._echo(self._input.readline(n))    def readlines(self) -> t.List[bytes]:        return [self._echo(x) for x in self._input.readlines()]    def __iter__(self) -> t.Iterator[bytes]:        return iter(self._echo(x) for x in self._input)    def __repr__(self) -> str:        return repr(self._input)@contextlib.contextmanagerdef _pause_echo(stream: t.Optional[EchoingStdin]) -> t.Iterator[None]:    if stream is None:        yield    else:        stream._paused = True        yield        stream._paused = Falseclass _NamedTextIOWrapper(io.TextIOWrapper):    def __init__(        self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any    ) -> None:        super().__init__(buffer, **kwargs)        self._name = name        self._mode = mode    @property    def name(self) -> str:        return self._name    @property    def mode(self) -> str:        return self._modedef make_input_stream(    input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]], charset: str) -> t.BinaryIO:    # Is already an input stream.    if hasattr(input, "read"):        rv = _find_binary_reader(t.cast(t.IO[t.Any], input))        if rv is not None:            return rv        raise TypeError("Could not find binary reader for input stream.")    if input is None:        input = b""    elif isinstance(input, str):        input = input.encode(charset)    return io.BytesIO(input)class Result:    """Holds the captured result of an invoked CLI script."""    def __init__(        self,        runner: "CliRunner",        stdout_bytes: bytes,        stderr_bytes: t.Optional[bytes],        return_value: t.Any,        exit_code: int,        exception: t.Optional[BaseException],        exc_info: t.Optional[            t.Tuple[t.Type[BaseException], BaseException, TracebackType]        ] = None,    ):        #: The runner that created the result        self.runner = runner        #: The standard output as bytes.        self.stdout_bytes = stdout_bytes        #: The standard error as bytes, or None if not available        self.stderr_bytes = stderr_bytes        #: The value returned from the invoked command.        #:        #: .. versionadded:: 8.0        self.return_value = return_value        #: The exit code as integer.        self.exit_code = exit_code        #: The exception that happened if one did.        self.exception = exception        #: The traceback        self.exc_info = exc_info    @property    def output(self) -> str:        """The (standard) output as unicode string."""        return self.stdout    @property    def stdout(self) -> str:        """The standard output as unicode string."""        return self.stdout_bytes.decode(self.runner.charset, "replace").replace(            "\r\n", "\n"        )    @property    def stderr(self) -> str:        """The standard error as unicode string."""        if self.stderr_bytes is None:            raise ValueError("stderr not separately captured")        return self.stderr_bytes.decode(self.runner.charset, "replace").replace(            "\r\n", "\n"        )    def __repr__(self) -> str:        exc_str = repr(self.exception) if self.exception else "okay"        return f"<{type(self).__name__} {exc_str}>"class CliRunner:    """The CLI runner provides functionality to invoke a Click command line    script for unittesting purposes in a isolated environment.  This only    works in single-threaded systems without any concurrency as it changes the    global interpreter state.    :param charset: the character set for the input and output data.    :param env: a dictionary with environment variables for overriding.    :param echo_stdin: if this is set to `True`, then reading from stdin writes                       to stdout.  This is useful for showing examples in                       some circumstances.  Note that regular prompts                       will automatically echo the input.    :param mix_stderr: if this is set to `False`, then stdout and stderr are                       preserved as independent streams.  This is useful for                       Unix-philosophy apps that have predictable stdout and                       noisy stderr, such that each may be measured                       independently    """    def __init__(        self,        charset: str = "utf-8",        env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,        echo_stdin: bool = False,        mix_stderr: bool = True,    ) -> None:        self.charset = charset        self.env: t.Mapping[str, t.Optional[str]] = env or {}        self.echo_stdin = echo_stdin        self.mix_stderr = mix_stderr    def get_default_prog_name(self, cli: "BaseCommand") -> str:        """Given a command object it will return the default program name        for it.  The default is the `name` attribute or ``"root"`` if not        set.        """        return cli.name or "root"    def make_env(        self, overrides: t.Optional[t.Mapping[str, t.Optional[str]]] = None    ) -> t.Mapping[str, t.Optional[str]]:        """Returns the environment overrides for invoking a script."""        rv = dict(self.env)        if overrides:            rv.update(overrides)        return rv    @contextlib.contextmanager    def isolation(        self,        input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None,        env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,        color: bool = False,    ) -> t.Iterator[t.Tuple[io.BytesIO, t.Optional[io.BytesIO]]]:        """A context manager that sets up the isolation for invoking of a        command line tool.  This sets up stdin with the given input data        and `os.environ` with the overrides from the given dictionary.        This also rebinds some internals in Click to be mocked (like the        prompt functionality).        This is automatically done in the :meth:`invoke` method.        :param input: the input stream to put into sys.stdin.        :param env: the environment overrides as dictionary.        :param color: whether the output should contain color codes. The                      application can still override this explicitly.        .. versionchanged:: 8.0            ``stderr`` is opened with ``errors="backslashreplace"``            instead of the default ``"strict"``.        .. versionchanged:: 4.0            Added the ``color`` parameter.        """        bytes_input = make_input_stream(input, self.charset)        echo_input = None        old_stdin = sys.stdin        old_stdout = sys.stdout        old_stderr = sys.stderr        old_forced_width = formatting.FORCED_WIDTH        formatting.FORCED_WIDTH = 80        env = self.make_env(env)        bytes_output = io.BytesIO()        if self.echo_stdin:            bytes_input = echo_input = t.cast(                t.BinaryIO, EchoingStdin(bytes_input, bytes_output)            )        sys.stdin = text_input = _NamedTextIOWrapper(            bytes_input, encoding=self.charset, name="<stdin>", mode="r"        )        if self.echo_stdin:            # Force unbuffered reads, otherwise TextIOWrapper reads a            # large chunk which is echoed early.            text_input._CHUNK_SIZE = 1  # type: ignore        sys.stdout = _NamedTextIOWrapper(            bytes_output, encoding=self.charset, name="<stdout>", mode="w"        )        bytes_error = None        if self.mix_stderr:            sys.stderr = sys.stdout        else:            bytes_error = io.BytesIO()            sys.stderr = _NamedTextIOWrapper(                bytes_error,                encoding=self.charset,                name="<stderr>",                mode="w",                errors="backslashreplace",            )        @_pause_echo(echo_input)  # type: ignore        def visible_input(prompt: t.Optional[str] = None) -> str:            sys.stdout.write(prompt or "")            val = text_input.readline().rstrip("\r\n")            sys.stdout.write(f"{val}\n")            sys.stdout.flush()            return val        @_pause_echo(echo_input)  # type: ignore        def hidden_input(prompt: t.Optional[str] = None) -> str:            sys.stdout.write(f"{prompt or ''}\n")            sys.stdout.flush()            return text_input.readline().rstrip("\r\n")        @_pause_echo(echo_input)  # type: ignore        def _getchar(echo: bool) -> str:            char = sys.stdin.read(1)            if echo:                sys.stdout.write(char)            sys.stdout.flush()            return char        default_color = color        def should_strip_ansi(            stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None        ) -> bool:            if color is None:                return not default_color            return not color        old_visible_prompt_func = termui.visible_prompt_func        old_hidden_prompt_func = termui.hidden_prompt_func        old__getchar_func = termui._getchar        old_should_strip_ansi = utils.should_strip_ansi  # type: ignore        termui.visible_prompt_func = visible_input        termui.hidden_prompt_func = hidden_input        termui._getchar = _getchar        utils.should_strip_ansi = should_strip_ansi  # type: ignore        old_env = {}        try:            for key, value in env.items():                old_env[key] = os.environ.get(key)                if value is None:                    try:                        del os.environ[key]                    except Exception:                        pass                else:                    os.environ[key] = value            yield (bytes_output, bytes_error)        finally:            for key, value in old_env.items():                if value is None:                    try:                        del os.environ[key]                    except Exception:                        pass                else:                    os.environ[key] = value            sys.stdout = old_stdout            sys.stderr = old_stderr            sys.stdin = old_stdin            termui.visible_prompt_func = old_visible_prompt_func            termui.hidden_prompt_func = old_hidden_prompt_func            termui._getchar = old__getchar_func            utils.should_strip_ansi = old_should_strip_ansi  # type: ignore            formatting.FORCED_WIDTH = old_forced_width    def invoke(        self,        cli: "BaseCommand",        args: t.Optional[t.Union[str, t.Sequence[str]]] = None,        input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None,        env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,        catch_exceptions: bool = True,        color: bool = False,        **extra: t.Any,    ) -> Result:        """Invokes a command in an isolated environment.  The arguments are        forwarded directly to the command line script, the `extra` keyword        arguments are passed to the :meth:`~clickpkg.Command.main` function of        the command.        This returns a :class:`Result` object.        :param cli: the command to invoke        :param args: the arguments to invoke. It may be given as an iterable                     or a string. When given as string it will be interpreted                     as a Unix shell command. More details at                     :func:`shlex.split`.        :param input: the input data for `sys.stdin`.        :param env: the environment overrides.        :param catch_exceptions: Whether to catch any other exceptions than                                 ``SystemExit``.        :param extra: the keyword arguments to pass to :meth:`main`.        :param color: whether the output should contain color codes. The                      application can still override this explicitly.        .. versionchanged:: 8.0            The result object has the ``return_value`` attribute with            the value returned from the invoked command.        .. versionchanged:: 4.0            Added the ``color`` parameter.        .. versionchanged:: 3.0            Added the ``catch_exceptions`` parameter.        .. versionchanged:: 3.0            The result object has the ``exc_info`` attribute with the            traceback if available.        """        exc_info = None        with self.isolation(input=input, env=env, color=color) as outstreams:            return_value = None            exception: t.Optional[BaseException] = None            exit_code = 0            if isinstance(args, str):                args = shlex.split(args)            try:                prog_name = extra.pop("prog_name")            except KeyError:                prog_name = self.get_default_prog_name(cli)            try:                return_value = cli.main(args=args or (), prog_name=prog_name, **extra)            except SystemExit as e:                exc_info = sys.exc_info()                e_code = t.cast(t.Optional[t.Union[int, t.Any]], e.code)                if e_code is None:                    e_code = 0                if e_code != 0:                    exception = e                if not isinstance(e_code, int):                    sys.stdout.write(str(e_code))                    sys.stdout.write("\n")                    e_code = 1                exit_code = e_code            except Exception as e:                if not catch_exceptions:                    raise                exception = e                exit_code = 1                exc_info = sys.exc_info()            finally:                sys.stdout.flush()                stdout = outstreams[0].getvalue()                if self.mix_stderr:                    stderr = None                else:                    stderr = outstreams[1].getvalue()  # type: ignore        return Result(            runner=self,            stdout_bytes=stdout,            stderr_bytes=stderr,            return_value=return_value,            exit_code=exit_code,            exception=exception,            exc_info=exc_info,  # type: ignore        )    @contextlib.contextmanager    def isolated_filesystem(        self, temp_dir: t.Optional[t.Union[str, "os.PathLike[str]"]] = None    ) -> t.Iterator[str]:        """A context manager that creates a temporary directory and        changes the current working directory to it. This isolates tests        that affect the contents of the CWD to prevent them from        interfering with each other.        :param temp_dir: Create the temporary directory under this            directory. If given, the created directory is not removed            when exiting.        .. versionchanged:: 8.0            Added the ``temp_dir`` parameter.        """        cwd = os.getcwd()        dt = tempfile.mkdtemp(dir=temp_dir)        os.chdir(dt)        try:            yield dt        finally:            os.chdir(cwd)            if temp_dir is None:                try:                    shutil.rmtree(dt)                except OSError:  # noqa: B014                    pass
 |