| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623 | import codecsimport ioimport osimport reimport sysimport typing as tfrom weakref import WeakKeyDictionaryCYGWIN = sys.platform.startswith("cygwin")WIN = sys.platform.startswith("win")auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")def _make_text_stream(    stream: t.BinaryIO,    encoding: t.Optional[str],    errors: t.Optional[str],    force_readable: bool = False,    force_writable: bool = False,) -> t.TextIO:    if encoding is None:        encoding = get_best_encoding(stream)    if errors is None:        errors = "replace"    return _NonClosingTextIOWrapper(        stream,        encoding,        errors,        line_buffering=True,        force_readable=force_readable,        force_writable=force_writable,    )def is_ascii_encoding(encoding: str) -> bool:    """Checks if a given encoding is ascii."""    try:        return codecs.lookup(encoding).name == "ascii"    except LookupError:        return Falsedef get_best_encoding(stream: t.IO[t.Any]) -> str:    """Returns the default stream encoding if not found."""    rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()    if is_ascii_encoding(rv):        return "utf-8"    return rvclass _NonClosingTextIOWrapper(io.TextIOWrapper):    def __init__(        self,        stream: t.BinaryIO,        encoding: t.Optional[str],        errors: t.Optional[str],        force_readable: bool = False,        force_writable: bool = False,        **extra: t.Any,    ) -> None:        self._stream = stream = t.cast(            t.BinaryIO, _FixupStream(stream, force_readable, force_writable)        )        super().__init__(stream, encoding, errors, **extra)    def __del__(self) -> None:        try:            self.detach()        except Exception:            pass    def isatty(self) -> bool:        # https://bitbucket.org/pypy/pypy/issue/1803        return self._stream.isatty()class _FixupStream:    """The new io interface needs more from streams than streams    traditionally implement.  As such, this fix-up code is necessary in    some circumstances.    The forcing of readable and writable flags are there because some tools    put badly patched objects on sys (one such offender are certain version    of jupyter notebook).    """    def __init__(        self,        stream: t.BinaryIO,        force_readable: bool = False,        force_writable: bool = False,    ):        self._stream = stream        self._force_readable = force_readable        self._force_writable = force_writable    def __getattr__(self, name: str) -> t.Any:        return getattr(self._stream, name)    def read1(self, size: int) -> bytes:        f = getattr(self._stream, "read1", None)        if f is not None:            return t.cast(bytes, f(size))        return self._stream.read(size)    def readable(self) -> bool:        if self._force_readable:            return True        x = getattr(self._stream, "readable", None)        if x is not None:            return t.cast(bool, x())        try:            self._stream.read(0)        except Exception:            return False        return True    def writable(self) -> bool:        if self._force_writable:            return True        x = getattr(self._stream, "writable", None)        if x is not None:            return t.cast(bool, x())        try:            self._stream.write("")  # type: ignore        except Exception:            try:                self._stream.write(b"")            except Exception:                return False        return True    def seekable(self) -> bool:        x = getattr(self._stream, "seekable", None)        if x is not None:            return t.cast(bool, x())        try:            self._stream.seek(self._stream.tell())        except Exception:            return False        return Truedef _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool:    try:        return isinstance(stream.read(0), bytes)    except Exception:        return default        # This happens in some cases where the stream was already        # closed.  In this case, we assume the default.def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool:    try:        stream.write(b"")    except Exception:        try:            stream.write("")            return False        except Exception:            pass        return default    return Truedef _find_binary_reader(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:    # We need to figure out if the given stream is already binary.    # This can happen because the official docs recommend detaching    # the streams to get binary streams.  Some code might do this, so    # we need to deal with this case explicitly.    if _is_binary_reader(stream, False):        return t.cast(t.BinaryIO, stream)    buf = getattr(stream, "buffer", None)    # Same situation here; this time we assume that the buffer is    # actually binary in case it's closed.    if buf is not None and _is_binary_reader(buf, True):        return t.cast(t.BinaryIO, buf)    return Nonedef _find_binary_writer(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:    # We need to figure out if the given stream is already binary.    # This can happen because the official docs recommend detaching    # the streams to get binary streams.  Some code might do this, so    # we need to deal with this case explicitly.    if _is_binary_writer(stream, False):        return t.cast(t.BinaryIO, stream)    buf = getattr(stream, "buffer", None)    # Same situation here; this time we assume that the buffer is    # actually binary in case it's closed.    if buf is not None and _is_binary_writer(buf, True):        return t.cast(t.BinaryIO, buf)    return Nonedef _stream_is_misconfigured(stream: t.TextIO) -> bool:    """A stream is misconfigured if its encoding is ASCII."""    # If the stream does not have an encoding set, we assume it's set    # to ASCII.  This appears to happen in certain unittest    # environments.  It's not quite clear what the correct behavior is    # but this at least will force Click to recover somehow.    return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]) -> bool:    """A stream attribute is compatible if it is equal to the    desired value or the desired value is unset and the attribute    has a value.    """    stream_value = getattr(stream, attr, None)    return stream_value == value or (value is None and stream_value is not None)def _is_compatible_text_stream(    stream: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]) -> bool:    """Check if a stream's encoding and errors attributes are    compatible with the desired values.    """    return _is_compat_stream_attr(        stream, "encoding", encoding    ) and _is_compat_stream_attr(stream, "errors", errors)def _force_correct_text_stream(    text_stream: t.IO[t.Any],    encoding: t.Optional[str],    errors: t.Optional[str],    is_binary: t.Callable[[t.IO[t.Any], bool], bool],    find_binary: t.Callable[[t.IO[t.Any]], t.Optional[t.BinaryIO]],    force_readable: bool = False,    force_writable: bool = False,) -> t.TextIO:    if is_binary(text_stream, False):        binary_reader = t.cast(t.BinaryIO, text_stream)    else:        text_stream = t.cast(t.TextIO, text_stream)        # If the stream looks compatible, and won't default to a        # misconfigured ascii encoding, return it as-is.        if _is_compatible_text_stream(text_stream, encoding, errors) and not (            encoding is None and _stream_is_misconfigured(text_stream)        ):            return text_stream        # Otherwise, get the underlying binary reader.        possible_binary_reader = find_binary(text_stream)        # If that's not possible, silently use the original reader        # and get mojibake instead of exceptions.        if possible_binary_reader is None:            return text_stream        binary_reader = possible_binary_reader    # Default errors to replace instead of strict in order to get    # something that works.    if errors is None:        errors = "replace"    # Wrap the binary stream in a text stream with the correct    # encoding parameters.    return _make_text_stream(        binary_reader,        encoding,        errors,        force_readable=force_readable,        force_writable=force_writable,    )def _force_correct_text_reader(    text_reader: t.IO[t.Any],    encoding: t.Optional[str],    errors: t.Optional[str],    force_readable: bool = False,) -> t.TextIO:    return _force_correct_text_stream(        text_reader,        encoding,        errors,        _is_binary_reader,        _find_binary_reader,        force_readable=force_readable,    )def _force_correct_text_writer(    text_writer: t.IO[t.Any],    encoding: t.Optional[str],    errors: t.Optional[str],    force_writable: bool = False,) -> t.TextIO:    return _force_correct_text_stream(        text_writer,        encoding,        errors,        _is_binary_writer,        _find_binary_writer,        force_writable=force_writable,    )def get_binary_stdin() -> t.BinaryIO:    reader = _find_binary_reader(sys.stdin)    if reader is None:        raise RuntimeError("Was not able to determine binary stream for sys.stdin.")    return readerdef get_binary_stdout() -> t.BinaryIO:    writer = _find_binary_writer(sys.stdout)    if writer is None:        raise RuntimeError("Was not able to determine binary stream for sys.stdout.")    return writerdef get_binary_stderr() -> t.BinaryIO:    writer = _find_binary_writer(sys.stderr)    if writer is None:        raise RuntimeError("Was not able to determine binary stream for sys.stderr.")    return writerdef get_text_stdin(    encoding: t.Optional[str] = None, errors: t.Optional[str] = None) -> t.TextIO:    rv = _get_windows_console_stream(sys.stdin, encoding, errors)    if rv is not None:        return rv    return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True)def get_text_stdout(    encoding: t.Optional[str] = None, errors: t.Optional[str] = None) -> t.TextIO:    rv = _get_windows_console_stream(sys.stdout, encoding, errors)    if rv is not None:        return rv    return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True)def get_text_stderr(    encoding: t.Optional[str] = None, errors: t.Optional[str] = None) -> t.TextIO:    rv = _get_windows_console_stream(sys.stderr, encoding, errors)    if rv is not None:        return rv    return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True)def _wrap_io_open(    file: t.Union[str, "os.PathLike[str]", int],    mode: str,    encoding: t.Optional[str],    errors: t.Optional[str],) -> t.IO[t.Any]:    """Handles not passing ``encoding`` and ``errors`` in binary mode."""    if "b" in mode:        return open(file, mode)    return open(file, mode, encoding=encoding, errors=errors)def open_stream(    filename: "t.Union[str, os.PathLike[str]]",    mode: str = "r",    encoding: t.Optional[str] = None,    errors: t.Optional[str] = "strict",    atomic: bool = False,) -> t.Tuple[t.IO[t.Any], bool]:    binary = "b" in mode    filename = os.fspath(filename)    # Standard streams first. These are simple because they ignore the    # atomic flag. Use fsdecode to handle Path("-").    if os.fsdecode(filename) == "-":        if any(m in mode for m in ["w", "a", "x"]):            if binary:                return get_binary_stdout(), False            return get_text_stdout(encoding=encoding, errors=errors), False        if binary:            return get_binary_stdin(), False        return get_text_stdin(encoding=encoding, errors=errors), False    # Non-atomic writes directly go out through the regular open functions.    if not atomic:        return _wrap_io_open(filename, mode, encoding, errors), True    # Some usability stuff for atomic writes    if "a" in mode:        raise ValueError(            "Appending to an existing file is not supported, because that"            " would involve an expensive `copy`-operation to a temporary"            " file. Open the file in normal `w`-mode and copy explicitly"            " if that's what you're after."        )    if "x" in mode:        raise ValueError("Use the `overwrite`-parameter instead.")    if "w" not in mode:        raise ValueError("Atomic writes only make sense with `w`-mode.")    # Atomic writes are more complicated.  They work by opening a file    # as a proxy in the same folder and then using the fdopen    # functionality to wrap it in a Python file.  Then we wrap it in an    # atomic file that moves the file over on close.    import errno    import random    try:        perm: t.Optional[int] = os.stat(filename).st_mode    except OSError:        perm = None    flags = os.O_RDWR | os.O_CREAT | os.O_EXCL    if binary:        flags |= getattr(os, "O_BINARY", 0)    while True:        tmp_filename = os.path.join(            os.path.dirname(filename),            f".__atomic-write{random.randrange(1 << 32):08x}",        )        try:            fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)            break        except OSError as e:            if e.errno == errno.EEXIST or (                os.name == "nt"                and e.errno == errno.EACCES                and os.path.isdir(e.filename)                and os.access(e.filename, os.W_OK)            ):                continue            raise    if perm is not None:        os.chmod(tmp_filename, perm)  # in case perm includes bits in umask    f = _wrap_io_open(fd, mode, encoding, errors)    af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))    return t.cast(t.IO[t.Any], af), Trueclass _AtomicFile:    def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None:        self._f = f        self._tmp_filename = tmp_filename        self._real_filename = real_filename        self.closed = False    @property    def name(self) -> str:        return self._real_filename    def close(self, delete: bool = False) -> None:        if self.closed:            return        self._f.close()        os.replace(self._tmp_filename, self._real_filename)        self.closed = True    def __getattr__(self, name: str) -> t.Any:        return getattr(self._f, name)    def __enter__(self) -> "_AtomicFile":        return self    def __exit__(self, exc_type: t.Optional[t.Type[BaseException]], *_: t.Any) -> None:        self.close(delete=exc_type is not None)    def __repr__(self) -> str:        return repr(self._f)def strip_ansi(value: str) -> str:    return _ansi_re.sub("", value)def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool:    while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):        stream = stream._stream    return stream.__class__.__module__.startswith("ipykernel.")def should_strip_ansi(    stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None) -> bool:    if color is None:        if stream is None:            stream = sys.stdin        return not isatty(stream) and not _is_jupyter_kernel_output(stream)    return not color# On Windows, wrap the output streams with colorama to support ANSI# color codes.# NOTE: double check is needed so mypy does not analyze this on Linuxif sys.platform.startswith("win") and WIN:    from ._winconsole import _get_windows_console_stream    def _get_argv_encoding() -> str:        import locale        return locale.getpreferredencoding()    _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()    def auto_wrap_for_ansi(  # noqa: F811        stream: t.TextIO, color: t.Optional[bool] = None    ) -> t.TextIO:        """Support ANSI color and style codes on Windows by wrapping a        stream with colorama.        """        try:            cached = _ansi_stream_wrappers.get(stream)        except Exception:            cached = None        if cached is not None:            return cached        import colorama        strip = should_strip_ansi(stream, color)        ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)        rv = t.cast(t.TextIO, ansi_wrapper.stream)        _write = rv.write        def _safe_write(s):            try:                return _write(s)            except BaseException:                ansi_wrapper.reset_all()                raise        rv.write = _safe_write        try:            _ansi_stream_wrappers[stream] = rv        except Exception:            pass        return rvelse:    def _get_argv_encoding() -> str:        return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding()    def _get_windows_console_stream(        f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]    ) -> t.Optional[t.TextIO]:        return Nonedef term_len(x: str) -> int:    return len(strip_ansi(x))def isatty(stream: t.IO[t.Any]) -> bool:    try:        return stream.isatty()    except Exception:        return Falsedef _make_cached_stream_func(    src_func: t.Callable[[], t.Optional[t.TextIO]],    wrapper_func: t.Callable[[], t.TextIO],) -> t.Callable[[], t.Optional[t.TextIO]]:    cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()    def func() -> t.Optional[t.TextIO]:        stream = src_func()        if stream is None:            return None        try:            rv = cache.get(stream)        except Exception:            rv = None        if rv is not None:            return rv        rv = wrapper_func()        try:            cache[stream] = rv        except Exception:            pass        return rv    return func_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)binary_streams: t.Mapping[str, t.Callable[[], t.BinaryIO]] = {    "stdin": get_binary_stdin,    "stdout": get_binary_stdout,    "stderr": get_binary_stderr,}text_streams: t.Mapping[    str, t.Callable[[t.Optional[str], t.Optional[str]], t.TextIO]] = {    "stdin": get_text_stdin,    "stdout": get_text_stdout,    "stderr": get_text_stderr,}
 |