| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 | # This module is based on the excellent work by Adam Bartoš who# provided a lot of what went into the implementation here in# the discussion to issue1602 in the Python bug tracker.## There are some general differences in regards to how this works# compared to the original patches as we do not need to patch# the entire interpreter but just work in our little world of# echo and prompt.import ioimport sysimport timeimport typing as tfrom ctypes import byreffrom ctypes import c_charfrom ctypes import c_char_pfrom ctypes import c_intfrom ctypes import c_ssize_tfrom ctypes import c_ulongfrom ctypes import c_void_pfrom ctypes import POINTERfrom ctypes import py_objectfrom ctypes import Structurefrom ctypes.wintypes import DWORDfrom ctypes.wintypes import HANDLEfrom ctypes.wintypes import LPCWSTRfrom ctypes.wintypes import LPWSTRfrom ._compat import _NonClosingTextIOWrapperassert sys.platform == "win32"import msvcrt  # noqa: E402from ctypes import windll  # noqa: E402from ctypes import WINFUNCTYPE  # noqa: E402c_ssize_p = POINTER(c_ssize_t)kernel32 = windll.kernel32GetStdHandle = kernel32.GetStdHandleReadConsoleW = kernel32.ReadConsoleWWriteConsoleW = kernel32.WriteConsoleWGetConsoleMode = kernel32.GetConsoleModeGetLastError = kernel32.GetLastErrorGetCommandLineW = WINFUNCTYPE(LPWSTR)(("GetCommandLineW", windll.kernel32))CommandLineToArgvW = WINFUNCTYPE(POINTER(LPWSTR), LPCWSTR, POINTER(c_int))(    ("CommandLineToArgvW", windll.shell32))LocalFree = WINFUNCTYPE(c_void_p, c_void_p)(("LocalFree", windll.kernel32))STDIN_HANDLE = GetStdHandle(-10)STDOUT_HANDLE = GetStdHandle(-11)STDERR_HANDLE = GetStdHandle(-12)PyBUF_SIMPLE = 0PyBUF_WRITABLE = 1ERROR_SUCCESS = 0ERROR_NOT_ENOUGH_MEMORY = 8ERROR_OPERATION_ABORTED = 995STDIN_FILENO = 0STDOUT_FILENO = 1STDERR_FILENO = 2EOF = b"\x1a"MAX_BYTES_WRITTEN = 32767try:    from ctypes import pythonapiexcept ImportError:    # On PyPy we cannot get buffers so our ability to operate here is    # severely limited.    get_buffer = Noneelse:    class Py_buffer(Structure):        _fields_ = [            ("buf", c_void_p),            ("obj", py_object),            ("len", c_ssize_t),            ("itemsize", c_ssize_t),            ("readonly", c_int),            ("ndim", c_int),            ("format", c_char_p),            ("shape", c_ssize_p),            ("strides", c_ssize_p),            ("suboffsets", c_ssize_p),            ("internal", c_void_p),        ]    PyObject_GetBuffer = pythonapi.PyObject_GetBuffer    PyBuffer_Release = pythonapi.PyBuffer_Release    def get_buffer(obj, writable=False):        buf = Py_buffer()        flags = PyBUF_WRITABLE if writable else PyBUF_SIMPLE        PyObject_GetBuffer(py_object(obj), byref(buf), flags)        try:            buffer_type = c_char * buf.len            return buffer_type.from_address(buf.buf)        finally:            PyBuffer_Release(byref(buf))class _WindowsConsoleRawIOBase(io.RawIOBase):    def __init__(self, handle):        self.handle = handle    def isatty(self):        super().isatty()        return Trueclass _WindowsConsoleReader(_WindowsConsoleRawIOBase):    def readable(self):        return True    def readinto(self, b):        bytes_to_be_read = len(b)        if not bytes_to_be_read:            return 0        elif bytes_to_be_read % 2:            raise ValueError(                "cannot read odd number of bytes from UTF-16-LE encoded console"            )        buffer = get_buffer(b, writable=True)        code_units_to_be_read = bytes_to_be_read // 2        code_units_read = c_ulong()        rv = ReadConsoleW(            HANDLE(self.handle),            buffer,            code_units_to_be_read,            byref(code_units_read),            None,        )        if GetLastError() == ERROR_OPERATION_ABORTED:            # wait for KeyboardInterrupt            time.sleep(0.1)        if not rv:            raise OSError(f"Windows error: {GetLastError()}")        if buffer[0] == EOF:            return 0        return 2 * code_units_read.valueclass _WindowsConsoleWriter(_WindowsConsoleRawIOBase):    def writable(self):        return True    @staticmethod    def _get_error_message(errno):        if errno == ERROR_SUCCESS:            return "ERROR_SUCCESS"        elif errno == ERROR_NOT_ENOUGH_MEMORY:            return "ERROR_NOT_ENOUGH_MEMORY"        return f"Windows error {errno}"    def write(self, b):        bytes_to_be_written = len(b)        buf = get_buffer(b)        code_units_to_be_written = min(bytes_to_be_written, MAX_BYTES_WRITTEN) // 2        code_units_written = c_ulong()        WriteConsoleW(            HANDLE(self.handle),            buf,            code_units_to_be_written,            byref(code_units_written),            None,        )        bytes_written = 2 * code_units_written.value        if bytes_written == 0 and bytes_to_be_written > 0:            raise OSError(self._get_error_message(GetLastError()))        return bytes_writtenclass ConsoleStream:    def __init__(self, text_stream: t.TextIO, byte_stream: t.BinaryIO) -> None:        self._text_stream = text_stream        self.buffer = byte_stream    @property    def name(self) -> str:        return self.buffer.name    def write(self, x: t.AnyStr) -> int:        if isinstance(x, str):            return self._text_stream.write(x)        try:            self.flush()        except Exception:            pass        return self.buffer.write(x)    def writelines(self, lines: t.Iterable[t.AnyStr]) -> None:        for line in lines:            self.write(line)    def __getattr__(self, name: str) -> t.Any:        return getattr(self._text_stream, name)    def isatty(self) -> bool:        return self.buffer.isatty()    def __repr__(self):        return f"<ConsoleStream name={self.name!r} encoding={self.encoding!r}>"def _get_text_stdin(buffer_stream: t.BinaryIO) -> t.TextIO:    text_stream = _NonClosingTextIOWrapper(        io.BufferedReader(_WindowsConsoleReader(STDIN_HANDLE)),        "utf-16-le",        "strict",        line_buffering=True,    )    return t.cast(t.TextIO, ConsoleStream(text_stream, buffer_stream))def _get_text_stdout(buffer_stream: t.BinaryIO) -> t.TextIO:    text_stream = _NonClosingTextIOWrapper(        io.BufferedWriter(_WindowsConsoleWriter(STDOUT_HANDLE)),        "utf-16-le",        "strict",        line_buffering=True,    )    return t.cast(t.TextIO, ConsoleStream(text_stream, buffer_stream))def _get_text_stderr(buffer_stream: t.BinaryIO) -> t.TextIO:    text_stream = _NonClosingTextIOWrapper(        io.BufferedWriter(_WindowsConsoleWriter(STDERR_HANDLE)),        "utf-16-le",        "strict",        line_buffering=True,    )    return t.cast(t.TextIO, ConsoleStream(text_stream, buffer_stream))_stream_factories: t.Mapping[int, t.Callable[[t.BinaryIO], t.TextIO]] = {    0: _get_text_stdin,    1: _get_text_stdout,    2: _get_text_stderr,}def _is_console(f: t.TextIO) -> bool:    if not hasattr(f, "fileno"):        return False    try:        fileno = f.fileno()    except (OSError, io.UnsupportedOperation):        return False    handle = msvcrt.get_osfhandle(fileno)    return bool(GetConsoleMode(handle, byref(DWORD())))def _get_windows_console_stream(    f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]) -> t.Optional[t.TextIO]:    if (        get_buffer is not None        and encoding in {"utf-16-le", None}        and errors in {"strict", None}        and _is_console(f)    ):        func = _stream_factories.get(f.fileno())        if func is not None:            b = getattr(f, "buffer", None)            if b is None:                return None            return func(b)
 |